Parseable

CrewAI

Log CrewAI agent crews and tasks to Parseable


Log CrewAI crews, agents, tasks, and tool usage to Parseable for multi-agent observability.

Overview

Integrate CrewAI with Parseable to:

  • Crew Monitoring - Track crew executions
  • Agent Logging - Monitor individual agent work
  • Task Tracking - Log task assignments and completions
  • Tool Usage - Track tool calls and results
  • Performance Analysis - Measure crew efficiency

Prerequisites

  • CrewAI installed (pip install crewai)
  • Parseable instance accessible
  • Python application

Custom Logger

from crewai import Agent, Task, Crew, Process
import requests
from datetime import datetime
from typing import Dict, Any, List, Optional
import uuid
import time
from functools import wraps

class ParseableCrewLogger:
    def __init__(self, parseable_url: str, dataset: str, username: str, password: str):
        self.parseable_url = parseable_url
        self.dataset = dataset
        self.auth = (username, password)
        self.crew_id = str(uuid.uuid4())
    
    def _log(self, entry: Dict[str, Any]):
        entry["timestamp"] = datetime.utcnow().isoformat() + "Z"
        entry["crew_id"] = self.crew_id
        try:
            requests.post(
                f"{self.parseable_url}/api/v1/ingest",
                json=[entry],
                auth=self.auth,
                headers={"X-P-Stream": self.dataset},
                timeout=5
            )
        except Exception as e:
            print(f"Logging failed: {e}")
    
    def log_crew_start(self, crew_name: str, agents: List[str], tasks: List[str]):
        self._log({
            "event": "crew_start",
            "crew_name": crew_name,
            "agents": agents,
            "tasks": tasks,
            "agent_count": len(agents),
            "task_count": len(tasks)
        })
    
    def log_crew_end(self, crew_name: str, result: str, duration_ms: float):
        self._log({
            "event": "crew_end",
            "crew_name": crew_name,
            "result_preview": str(result)[:500],
            "duration_ms": duration_ms
        })
    
    def log_task_start(self, task_description: str, agent_name: str):
        self._log({
            "event": "task_start",
            "task_description": task_description[:200],
            "assigned_agent": agent_name
        })
    
    def log_task_end(self, task_description: str, agent_name: str, output: str, duration_ms: float):
        self._log({
            "event": "task_end",
            "task_description": task_description[:200],
            "agent": agent_name,
            "output_preview": str(output)[:500],
            "duration_ms": duration_ms
        })
    
    def log_agent_action(self, agent_name: str, action: str, thought: str = None):
        self._log({
            "event": "agent_action",
            "agent": agent_name,
            "action": action[:200],
            "thought": thought[:200] if thought else None
        })
    
    def log_tool_use(self, agent_name: str, tool_name: str, input_data: str, output: str, duration_ms: float):
        self._log({
            "event": "tool_use",
            "agent": agent_name,
            "tool": tool_name,
            "input": str(input_data)[:200],
            "output": str(output)[:200],
            "duration_ms": duration_ms
        })
    
    def log_llm_call(self, agent_name: str, model: str, tokens: Dict[str, int] = None):
        entry = {
            "event": "llm_call",
            "agent": agent_name,
            "model": model
        }
        if tokens:
            entry.update(tokens)
        self._log(entry)
    
    def new_crew(self):
        self.crew_id = str(uuid.uuid4())

Wrapped Crew

class LoggedCrew:
    def __init__(self, crew: Crew, logger: ParseableCrewLogger):
        self.crew = crew
        self.logger = logger
    
    def kickoff(self, inputs: Dict = None):
        self.logger.new_crew()
        
        # Log crew start
        agent_names = [a.role for a in self.crew.agents]
        task_descriptions = [t.description[:100] for t in self.crew.tasks]
        
        self.logger.log_crew_start(
            crew_name=getattr(self.crew, 'name', 'unnamed_crew'),
            agents=agent_names,
            tasks=task_descriptions
        )
        
        start_time = time.time()
        
        try:
            result = self.crew.kickoff(inputs=inputs)
            
            duration_ms = (time.time() - start_time) * 1000
            self.logger.log_crew_end(
                crew_name=getattr(self.crew, 'name', 'unnamed_crew'),
                result=str(result),
                duration_ms=duration_ms
            )
            
            return result
            
        except Exception as e:
            self.logger._log({
                "event": "crew_error",
                "error": str(e),
                "error_type": type(e).__name__
            })
            raise

Usage

from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool

# Create logger
logger = ParseableCrewLogger(
    parseable_url="http://parseable:8000",
    dataset="crewai-logs",
    username="admin",
    password="admin"
)

# Define agents
researcher = Agent(
    role='Senior Research Analyst',
    goal='Uncover cutting-edge developments in AI',
    backstory='You are an expert researcher...',
    tools=[SerperDevTool()],
    verbose=True
)

writer = Agent(
    role='Tech Content Strategist',
    goal='Craft compelling content on tech advancements',
    backstory='You are a renowned content strategist...',
    verbose=True
)

# Define tasks
research_task = Task(
    description='Research the latest AI trends',
    expected_output='A comprehensive report on AI trends',
    agent=researcher
)

write_task = Task(
    description='Write a blog post about AI trends',
    expected_output='A 500-word blog post',
    agent=writer
)

# Create crew
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    process=Process.sequential
)

# Wrap with logger
logged_crew = LoggedCrew(crew, logger)

# Execute
result = logged_crew.kickoff(inputs={'topic': 'AI in 2024'})

Callback-Based Logging

from crewai.utilities.callbacks import CrewCallbackHandler

class ParseableCrewCallback(CrewCallbackHandler):
    def __init__(self, logger: ParseableCrewLogger):
        self.logger = logger
    
    def on_task_start(self, task):
        self.logger.log_task_start(
            task_description=task.description,
            agent_name=task.agent.role
        )
    
    def on_task_end(self, task, output):
        self.logger.log_task_end(
            task_description=task.description,
            agent_name=task.agent.role,
            output=str(output),
            duration_ms=0  # Calculate if needed
        )
    
    def on_tool_use(self, agent, tool, input_data, output):
        self.logger.log_tool_use(
            agent_name=agent.role,
            tool_name=tool.name,
            input_data=str(input_data),
            output=str(output),
            duration_ms=0
        )

Querying CrewAI Logs

-- Crew execution summary
SELECT 
  crew_id,
  crew_name,
  agent_count,
  task_count,
  duration_ms / 1000 as duration_seconds
FROM "crewai-logs"
WHERE event = 'crew_end'
ORDER BY timestamp DESC

-- Task performance by agent
SELECT 
  agent,
  COUNT(*) as tasks_completed,
  AVG(duration_ms) as avg_duration_ms
FROM "crewai-logs"
WHERE event = 'task_end'
GROUP BY agent
ORDER BY tasks_completed DESC

-- Tool usage analysis
SELECT 
  tool,
  agent,
  COUNT(*) as usage_count,
  AVG(duration_ms) as avg_duration
FROM "crewai-logs"
WHERE event = 'tool_use'
GROUP BY tool, agent
ORDER BY usage_count DESC

-- Error tracking
SELECT 
  timestamp,
  crew_id,
  error_type,
  error
FROM "crewai-logs"
WHERE event = 'crew_error'
ORDER BY timestamp DESC

Best Practices

  1. Track Crew IDs - Correlate all events in a crew run
  2. Log All Tasks - Monitor task assignments and completions
  3. Monitor Tools - Track tool usage and performance
  4. Measure Duration - Track time spent on each task

Next Steps

Was this page helpful?

On this page