Parseable

LangChain

Trace LangChain applications with Parseable


Trace LangChain chains, agents, and tool calls with Parseable for comprehensive LLM observability.

Overview

Integrate LangChain with Parseable to:

  • Chain Tracing - Track entire chain executions
  • Agent Monitoring - Monitor agent decisions and tool usage
  • Token Tracking - Aggregate token usage across chains
  • Latency Analysis - Identify slow components
  • Error Debugging - Trace failures through chains

Prerequisites

  • LangChain installed
  • Parseable instance accessible
  • Python application

Custom Callback Handler

Basic Handler

from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
import requests
from datetime import datetime
from typing import Dict, Any, List, Optional
import uuid

class ParseableCallbackHandler(BaseCallbackHandler):
    def __init__(self, parseable_url: str, dataset: str, username: str, password: str):
        self.parseable_url = parseable_url
        self.dataset = dataset
        self.auth = (username, password)
        self.run_id = str(uuid.uuid4())
        self.chain_stack = []
    
    def _log(self, entry: Dict[str, Any]):
        entry["run_id"] = self.run_id
        entry["timestamp"] = datetime.utcnow().isoformat() + "Z"
        try:
            requests.post(
                f"{self.parseable_url}/api/v1/ingest",
                json=[entry],
                auth=self.auth,
                headers={"X-P-Stream": self.dataset},
                timeout=5
            )
        except Exception as e:
            print(f"Logging failed: {e}")
    
    def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
        chain_id = str(uuid.uuid4())
        self.chain_stack.append({
            "chain_id": chain_id,
            "start_time": datetime.utcnow(),
            "name": serialized.get("name", "unknown")
        })
        
        self._log({
            "event": "chain_start",
            "chain_id": chain_id,
            "chain_name": serialized.get("name"),
            "inputs": str(inputs)[:500]
        })
    
    def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
        if self.chain_stack:
            chain = self.chain_stack.pop()
            duration = (datetime.utcnow() - chain["start_time"]).total_seconds() * 1000
            
            self._log({
                "event": "chain_end",
                "chain_id": chain["chain_id"],
                "chain_name": chain["name"],
                "duration_ms": duration,
                "outputs": str(outputs)[:500]
            })
    
    def on_chain_error(self, error: Exception, **kwargs):
        if self.chain_stack:
            chain = self.chain_stack.pop()
            
            self._log({
                "event": "chain_error",
                "chain_id": chain["chain_id"],
                "chain_name": chain["name"],
                "error": str(error),
                "error_type": type(error).__name__
            })
    
    def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs):
        self._log({
            "event": "llm_start",
            "model": serialized.get("name", "unknown"),
            "prompt_count": len(prompts),
            "prompt_preview": prompts[0][:200] if prompts else None
        })
    
    def on_llm_end(self, response: LLMResult, **kwargs):
        usage = {}
        if response.llm_output:
            token_usage = response.llm_output.get("token_usage", {})
            usage = {
                "prompt_tokens": token_usage.get("prompt_tokens"),
                "completion_tokens": token_usage.get("completion_tokens"),
                "total_tokens": token_usage.get("total_tokens")
            }
        
        self._log({
            "event": "llm_end",
            "generations": len(response.generations),
            **usage
        })
    
    def on_llm_error(self, error: Exception, **kwargs):
        self._log({
            "event": "llm_error",
            "error": str(error),
            "error_type": type(error).__name__
        })
    
    def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs):
        self._log({
            "event": "tool_start",
            "tool_name": serialized.get("name", "unknown"),
            "input": input_str[:500]
        })
    
    def on_tool_end(self, output: str, **kwargs):
        self._log({
            "event": "tool_end",
            "output": output[:500]
        })
    
    def on_tool_error(self, error: Exception, **kwargs):
        self._log({
            "event": "tool_error",
            "error": str(error),
            "error_type": type(error).__name__
        })
    
    def on_agent_action(self, action, **kwargs):
        self._log({
            "event": "agent_action",
            "tool": action.tool,
            "tool_input": str(action.tool_input)[:500],
            "log": action.log[:500]
        })
    
    def on_agent_finish(self, finish, **kwargs):
        self._log({
            "event": "agent_finish",
            "output": str(finish.return_values)[:500]
        })

Usage with Chains

from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

# Create handler
handler = ParseableCallbackHandler(
    parseable_url="http://parseable:8000",
    dataset="langchain-traces",
    username="admin",
    password="admin"
)

# Create chain with callback
llm = ChatOpenAI(model="gpt-4", callbacks=[handler])
prompt = PromptTemplate.from_template("Tell me about {topic}")
chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler])

# Run chain
result = chain.run(topic="machine learning")

Usage with Agents

from langchain.agents import initialize_agent, Tool, AgentType
from langchain.chat_models import ChatOpenAI

# Define tools
tools = [
    Tool(
        name="Calculator",
        func=lambda x: eval(x),
        description="Useful for math calculations"
    )
]

# Create agent with callback
llm = ChatOpenAI(model="gpt-4")
agent = initialize_agent(
    tools,
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    callbacks=[handler],
    verbose=True
)

# Run agent
result = agent.run("What is 25 * 4 + 10?")

LCEL Integration

from langchain_core.runnables import RunnableConfig

# Create config with callbacks
config = RunnableConfig(callbacks=[handler])

# Use with LCEL chains
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_template("Tell me about {topic}")
model = ChatOpenAI()
chain = prompt | model

# Invoke with config
result = chain.invoke({"topic": "AI"}, config=config)

Querying LangChain Traces

-- Chain execution times
SELECT 
  chain_name,
  AVG(duration_ms) as avg_duration,
  COUNT(*) as executions
FROM "langchain-traces"
WHERE event = 'chain_end'
GROUP BY chain_name
ORDER BY avg_duration DESC

-- Token usage by run
SELECT 
  run_id,
  SUM(total_tokens) as total_tokens,
  COUNT(CASE WHEN event = 'tool_start' THEN 1 END) as tool_calls
FROM "langchain-traces"
GROUP BY run_id
ORDER BY total_tokens DESC

-- Agent tool usage
SELECT 
  tool,
  COUNT(*) as usage_count
FROM "langchain-traces"
WHERE event = 'agent_action'
GROUP BY tool
ORDER BY usage_count DESC

-- Error analysis
SELECT 
  event,
  error_type,
  error,
  COUNT(*) as count
FROM "langchain-traces"
WHERE event LIKE '%error%'
GROUP BY event, error_type, error
ORDER BY count DESC

Best Practices

  1. Use Run IDs - Correlate events in a single execution
  2. Truncate Content - Don't log full prompts/responses
  3. Track All Events - Log starts, ends, and errors
  4. Monitor Agents - Pay attention to tool usage patterns

Next Steps

Was this page helpful?

On this page