Parseable

LlamaIndex

Trace LlamaIndex applications with Parseable


Trace LlamaIndex queries, retrievals, and LLM calls with Parseable for RAG observability.

Overview

Integrate LlamaIndex with Parseable to:

  • Query Tracing - Track query engine executions
  • Retrieval Monitoring - Monitor document retrieval
  • LLM Tracking - Log LLM calls and token usage
  • Index Operations - Track indexing and embedding
  • RAG Debugging - Debug retrieval-augmented generation

Prerequisites

  • LlamaIndex installed
  • Parseable instance accessible
  • Python application

Custom Callback Handler

from llama_index.core.callbacks import CallbackManager, CBEventType
from llama_index.core.callbacks.base_handler import BaseCallbackHandler
import requests
from datetime import datetime
from typing import Dict, Any, List, Optional
import uuid

class ParseableCallbackHandler(BaseCallbackHandler):
    def __init__(self, parseable_url: str, dataset: str, username: str, password: str):
        self.parseable_url = parseable_url
        self.dataset = dataset
        self.auth = (username, password)
        self.event_starts = {}
        super().__init__(
            event_starts_to_ignore=[],
            event_ends_to_ignore=[]
        )
    
    def _log(self, entry: Dict[str, Any]):
        entry["timestamp"] = datetime.utcnow().isoformat() + "Z"
        try:
            requests.post(
                f"{self.parseable_url}/api/v1/ingest",
                json=[entry],
                auth=self.auth,
                headers={"X-P-Stream": self.dataset},
                timeout=5
            )
        except Exception as e:
            print(f"Logging failed: {e}")
    
    def on_event_start(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        parent_id: str = "",
        **kwargs
    ) -> str:
        self.event_starts[event_id] = datetime.utcnow()
        
        log_entry = {
            "event": "start",
            "event_type": event_type.value,
            "event_id": event_id,
            "parent_id": parent_id
        }
        
        if payload:
            if event_type == CBEventType.QUERY:
                log_entry["query"] = str(payload.get("query_str", ""))[:500]
            elif event_type == CBEventType.RETRIEVE:
                log_entry["query"] = str(payload.get("query_str", ""))[:500]
            elif event_type == CBEventType.LLM:
                messages = payload.get("messages", [])
                log_entry["message_count"] = len(messages)
                log_entry["model"] = payload.get("model_name", "unknown")
            elif event_type == CBEventType.EMBEDDING:
                log_entry["chunk_count"] = len(payload.get("chunks", []))
        
        self._log(log_entry)
        return event_id
    
    def on_event_end(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        **kwargs
    ) -> None:
        start_time = self.event_starts.pop(event_id, None)
        duration_ms = None
        if start_time:
            duration_ms = (datetime.utcnow() - start_time).total_seconds() * 1000
        
        log_entry = {
            "event": "end",
            "event_type": event_type.value,
            "event_id": event_id,
            "duration_ms": duration_ms
        }
        
        if payload:
            if event_type == CBEventType.QUERY:
                response = payload.get("response")
                if response:
                    log_entry["response_preview"] = str(response)[:500]
            elif event_type == CBEventType.RETRIEVE:
                nodes = payload.get("nodes", [])
                log_entry["nodes_retrieved"] = len(nodes)
                log_entry["scores"] = [n.score for n in nodes[:5]] if nodes else []
            elif event_type == CBEventType.LLM:
                response = payload.get("response")
                if response:
                    log_entry["response_preview"] = str(response)[:200]
                    # Token usage if available
                    if hasattr(response, "raw"):
                        usage = getattr(response.raw, "usage", None)
                        if usage:
                            log_entry["prompt_tokens"] = usage.prompt_tokens
                            log_entry["completion_tokens"] = usage.completion_tokens
            elif event_type == CBEventType.EMBEDDING:
                log_entry["embeddings_created"] = len(payload.get("embeddings", []))
        
        self._log(log_entry)
    
    def start_trace(self, trace_id: Optional[str] = None) -> None:
        self.trace_id = trace_id or str(uuid.uuid4())
    
    def end_trace(
        self,
        trace_id: Optional[str] = None,
        trace_map: Optional[Dict[str, List[str]]] = None
    ) -> None:
        pass

Usage

from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.callbacks import CallbackManager

# Create handler
handler = ParseableCallbackHandler(
    parseable_url="http://parseable:8000",
    dataset="llamaindex-traces",
    username="admin",
    password="admin"
)

# Create callback manager
callback_manager = CallbackManager([handler])

# Load documents
documents = SimpleDirectoryReader("data").load_data()

# Create index with callbacks
index = VectorStoreIndex.from_documents(
    documents,
    callback_manager=callback_manager
)

# Query with callbacks
query_engine = index.as_query_engine(callback_manager=callback_manager)
response = query_engine.query("What is the main topic?")

Global Handler Setup

from llama_index.core import Settings

# Set global callback manager
Settings.callback_manager = CallbackManager([handler])

# All LlamaIndex operations will now be traced

Querying LlamaIndex Traces

-- Query performance
SELECT 
  event_id,
  duration_ms,
  nodes_retrieved,
  response_preview
FROM "llamaindex-traces"
WHERE event_type = 'query' AND event = 'end'
ORDER BY timestamp DESC
LIMIT 50

-- Retrieval quality
SELECT 
  AVG(nodes_retrieved) as avg_nodes,
  AVG(scores[1]) as avg_top_score
FROM "llamaindex-traces"
WHERE event_type = 'retrieve' AND event = 'end'

-- LLM token usage
SELECT 
  model,
  SUM(prompt_tokens) as total_prompt,
  SUM(completion_tokens) as total_completion,
  COUNT(*) as calls
FROM "llamaindex-traces"
WHERE event_type = 'llm' AND event = 'end'
GROUP BY model

-- Embedding operations
SELECT 
  DATE_TRUNC('hour', timestamp) as hour,
  SUM(embeddings_created) as embeddings,
  COUNT(*) as operations
FROM "llamaindex-traces"
WHERE event_type = 'embedding' AND event = 'end'
GROUP BY hour
ORDER BY hour DESC

Best Practices

  1. Track All Events - Monitor queries, retrievals, and LLM calls
  2. Log Retrieval Scores - Analyze retrieval quality
  3. Monitor Token Usage - Track costs across operations
  4. Use Parent IDs - Correlate nested events

Next Steps

Was this page helpful?

On this page