CrewAI
Log CrewAI agent crews and tasks to Parseable
Log CrewAI crews, agents, tasks, and tool usage to Parseable for multi-agent observability.
Overview
Integrate CrewAI with Parseable to:
- Crew Monitoring - Track crew executions
- Agent Logging - Monitor individual agent work
- Task Tracking - Log task assignments and completions
- Tool Usage - Track tool calls and results
- Performance Analysis - Measure crew efficiency
Prerequisites
- CrewAI installed (
pip install crewai) - Parseable instance accessible
- Python application
Custom Logger
from crewai import Agent, Task, Crew, Process
import requests
from datetime import datetime
from typing import Dict, Any, List, Optional
import uuid
import time
from functools import wraps
class ParseableCrewLogger:
def __init__(self, parseable_url: str, dataset: str, username: str, password: str):
self.parseable_url = parseable_url
self.dataset = dataset
self.auth = (username, password)
self.crew_id = str(uuid.uuid4())
def _log(self, entry: Dict[str, Any]):
entry["timestamp"] = datetime.utcnow().isoformat() + "Z"
entry["crew_id"] = self.crew_id
try:
requests.post(
f"{self.parseable_url}/api/v1/ingest",
json=[entry],
auth=self.auth,
headers={"X-P-Stream": self.dataset},
timeout=5
)
except Exception as e:
print(f"Logging failed: {e}")
def log_crew_start(self, crew_name: str, agents: List[str], tasks: List[str]):
self._log({
"event": "crew_start",
"crew_name": crew_name,
"agents": agents,
"tasks": tasks,
"agent_count": len(agents),
"task_count": len(tasks)
})
def log_crew_end(self, crew_name: str, result: str, duration_ms: float):
self._log({
"event": "crew_end",
"crew_name": crew_name,
"result_preview": str(result)[:500],
"duration_ms": duration_ms
})
def log_task_start(self, task_description: str, agent_name: str):
self._log({
"event": "task_start",
"task_description": task_description[:200],
"assigned_agent": agent_name
})
def log_task_end(self, task_description: str, agent_name: str, output: str, duration_ms: float):
self._log({
"event": "task_end",
"task_description": task_description[:200],
"agent": agent_name,
"output_preview": str(output)[:500],
"duration_ms": duration_ms
})
def log_agent_action(self, agent_name: str, action: str, thought: str = None):
self._log({
"event": "agent_action",
"agent": agent_name,
"action": action[:200],
"thought": thought[:200] if thought else None
})
def log_tool_use(self, agent_name: str, tool_name: str, input_data: str, output: str, duration_ms: float):
self._log({
"event": "tool_use",
"agent": agent_name,
"tool": tool_name,
"input": str(input_data)[:200],
"output": str(output)[:200],
"duration_ms": duration_ms
})
def log_llm_call(self, agent_name: str, model: str, tokens: Dict[str, int] = None):
entry = {
"event": "llm_call",
"agent": agent_name,
"model": model
}
if tokens:
entry.update(tokens)
self._log(entry)
def new_crew(self):
self.crew_id = str(uuid.uuid4())Wrapped Crew
class LoggedCrew:
def __init__(self, crew: Crew, logger: ParseableCrewLogger):
self.crew = crew
self.logger = logger
def kickoff(self, inputs: Dict = None):
self.logger.new_crew()
# Log crew start
agent_names = [a.role for a in self.crew.agents]
task_descriptions = [t.description[:100] for t in self.crew.tasks]
self.logger.log_crew_start(
crew_name=getattr(self.crew, 'name', 'unnamed_crew'),
agents=agent_names,
tasks=task_descriptions
)
start_time = time.time()
try:
result = self.crew.kickoff(inputs=inputs)
duration_ms = (time.time() - start_time) * 1000
self.logger.log_crew_end(
crew_name=getattr(self.crew, 'name', 'unnamed_crew'),
result=str(result),
duration_ms=duration_ms
)
return result
except Exception as e:
self.logger._log({
"event": "crew_error",
"error": str(e),
"error_type": type(e).__name__
})
raiseUsage
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool
# Create logger
logger = ParseableCrewLogger(
parseable_url="http://parseable:8000",
dataset="crewai-logs",
username="admin",
password="admin"
)
# Define agents
researcher = Agent(
role='Senior Research Analyst',
goal='Uncover cutting-edge developments in AI',
backstory='You are an expert researcher...',
tools=[SerperDevTool()],
verbose=True
)
writer = Agent(
role='Tech Content Strategist',
goal='Craft compelling content on tech advancements',
backstory='You are a renowned content strategist...',
verbose=True
)
# Define tasks
research_task = Task(
description='Research the latest AI trends',
expected_output='A comprehensive report on AI trends',
agent=researcher
)
write_task = Task(
description='Write a blog post about AI trends',
expected_output='A 500-word blog post',
agent=writer
)
# Create crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential
)
# Wrap with logger
logged_crew = LoggedCrew(crew, logger)
# Execute
result = logged_crew.kickoff(inputs={'topic': 'AI in 2024'})Callback-Based Logging
from crewai.utilities.callbacks import CrewCallbackHandler
class ParseableCrewCallback(CrewCallbackHandler):
def __init__(self, logger: ParseableCrewLogger):
self.logger = logger
def on_task_start(self, task):
self.logger.log_task_start(
task_description=task.description,
agent_name=task.agent.role
)
def on_task_end(self, task, output):
self.logger.log_task_end(
task_description=task.description,
agent_name=task.agent.role,
output=str(output),
duration_ms=0 # Calculate if needed
)
def on_tool_use(self, agent, tool, input_data, output):
self.logger.log_tool_use(
agent_name=agent.role,
tool_name=tool.name,
input_data=str(input_data),
output=str(output),
duration_ms=0
)Querying CrewAI Logs
-- Crew execution summary
SELECT
crew_id,
crew_name,
agent_count,
task_count,
duration_ms / 1000 as duration_seconds
FROM "crewai-logs"
WHERE event = 'crew_end'
ORDER BY timestamp DESC
-- Task performance by agent
SELECT
agent,
COUNT(*) as tasks_completed,
AVG(duration_ms) as avg_duration_ms
FROM "crewai-logs"
WHERE event = 'task_end'
GROUP BY agent
ORDER BY tasks_completed DESC
-- Tool usage analysis
SELECT
tool,
agent,
COUNT(*) as usage_count,
AVG(duration_ms) as avg_duration
FROM "crewai-logs"
WHERE event = 'tool_use'
GROUP BY tool, agent
ORDER BY usage_count DESC
-- Error tracking
SELECT
timestamp,
crew_id,
error_type,
error
FROM "crewai-logs"
WHERE event = 'crew_error'
ORDER BY timestamp DESCBest Practices
- Track Crew IDs - Correlate all events in a crew run
- Log All Tasks - Monitor task assignments and completions
- Monitor Tools - Track tool usage and performance
- Measure Duration - Track time spent on each task
Next Steps
- Configure AutoGen logging
- Set up LangChain tracing
- Create dashboards for crew metrics
Was this page helpful?