Parseable

OpenAI

Log OpenAI API calls and responses to Parseable


Log OpenAI API calls, responses, and token usage to Parseable for LLM observability.

Overview

Integrate OpenAI with Parseable to:

  • API Logging - Track all API calls and responses
  • Token Usage - Monitor token consumption and costs
  • Latency Tracking - Measure response times
  • Error Analysis - Debug failed requests
  • Prompt Engineering - Analyze prompt effectiveness

Prerequisites

  • OpenAI API key
  • Parseable instance accessible
  • Python or Node.js application

Python Integration

Basic Wrapper

import openai
import requests
import time
from datetime import datetime
from functools import wraps

PARSEABLE_URL = "http://parseable:8000"
PARSEABLE_AUTH = ("admin", "admin")
STREAM = "openai-logs"

def log_to_parseable(log_entry):
    try:
        requests.post(
            f"{PARSEABLE_URL}/api/v1/ingest",
            json=[log_entry],
            auth=PARSEABLE_AUTH,
            headers={"X-P-Stream": STREAM}
        )
    except Exception as e:
        print(f"Failed to log: {e}")

def log_openai_call(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        error = None
        response = None
        
        try:
            response = func(*args, **kwargs)
            return response
        except Exception as e:
            error = str(e)
            raise
        finally:
            duration = time.time() - start_time
            
            log_entry = {
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "model": kwargs.get("model", "unknown"),
                "endpoint": func.__name__,
                "duration_ms": round(duration * 1000, 2),
                "success": error is None,
                "error": error
            }
            
            if response:
                usage = getattr(response, "usage", None)
                if usage:
                    log_entry["prompt_tokens"] = usage.prompt_tokens
                    log_entry["completion_tokens"] = usage.completion_tokens
                    log_entry["total_tokens"] = usage.total_tokens
            
            log_to_parseable(log_entry)
    
    return wrapper

# Wrap OpenAI client
client = openai.OpenAI()

@log_openai_call
def chat_completion(**kwargs):
    return client.chat.completions.create(**kwargs)

# Usage
response = chat_completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}]
)

Comprehensive Logger

import openai
import requests
import json
import hashlib
from datetime import datetime
from typing import Optional, Dict, Any

class OpenAILogger:
    def __init__(self, parseable_url: str, dataset: str, username: str, password: str):
        self.parseable_url = parseable_url
        self.dataset = dataset
        self.auth = (username, password)
        self.client = openai.OpenAI()
    
    def _log(self, entry: Dict[str, Any]):
        try:
            requests.post(
                f"{self.parseable_url}/api/v1/ingest",
                json=[entry],
                auth=self.auth,
                headers={"X-P-Stream": self.dataset},
                timeout=5
            )
        except Exception as e:
            print(f"Logging failed: {e}")
    
    def _hash_content(self, content: str) -> str:
        return hashlib.sha256(content.encode()).hexdigest()[:16]
    
    def chat(self, messages: list, model: str = "gpt-4", **kwargs) -> Any:
        start_time = datetime.utcnow()
        request_id = self._hash_content(json.dumps(messages) + str(start_time))
        
        log_entry = {
            "timestamp": start_time.isoformat() + "Z",
            "request_id": request_id,
            "type": "chat_completion",
            "model": model,
            "message_count": len(messages),
            "system_prompt": next((m["content"][:200] for m in messages if m["role"] == "system"), None),
            "user_prompt": next((m["content"][:500] for m in messages if m["role"] == "user"), None),
            **{k: v for k, v in kwargs.items() if k in ["temperature", "max_tokens", "top_p"]}
        }
        
        try:
            response = self.client.chat.completions.create(
                model=model,
                messages=messages,
                **kwargs
            )
            
            end_time = datetime.utcnow()
            log_entry.update({
                "success": True,
                "duration_ms": (end_time - start_time).total_seconds() * 1000,
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens,
                "finish_reason": response.choices[0].finish_reason,
                "response_preview": response.choices[0].message.content[:200] if response.choices else None
            })
            
            self._log(log_entry)
            return response
            
        except Exception as e:
            log_entry.update({
                "success": False,
                "error": str(e),
                "error_type": type(e).__name__
            })
            self._log(log_entry)
            raise

# Usage
logger = OpenAILogger(
    parseable_url="http://parseable:8000",
    dataset="openai-logs",
    username="admin",
    password="admin"
)

response = logger.chat(
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "What is the capital of France?"}
    ],
    model="gpt-4",
    temperature=0.7
)

Node.js Integration

const OpenAI = require('openai');
const axios = require('axios');

const PARSEABLE_URL = process.env.PARSEABLE_URL || 'http://parseable:8000';
const PARSEABLE_AUTH = Buffer.from('admin:admin').toString('base64');

class OpenAILogger {
  constructor() {
    this.client = new OpenAI();
  }

  async log(entry) {
    try {
      await axios.post(`${PARSEABLE_URL}/api/v1/ingest`, [entry], {
        headers: {
          'Authorization': `Basic ${PARSEABLE_AUTH}`,
          'X-P-Stream': 'openai-logs',
          'Content-Type': 'application/json'
        }
      });
    } catch (error) {
      console.error('Logging failed:', error.message);
    }
  }

  async chat(messages, options = {}) {
    const startTime = Date.now();
    const model = options.model || 'gpt-4';
    
    const logEntry = {
      timestamp: new Date().toISOString(),
      type: 'chat_completion',
      model,
      message_count: messages.length
    };

    try {
      const response = await this.client.chat.completions.create({
        model,
        messages,
        ...options
      });

      logEntry.success = true;
      logEntry.duration_ms = Date.now() - startTime;
      logEntry.prompt_tokens = response.usage?.prompt_tokens;
      logEntry.completion_tokens = response.usage?.completion_tokens;
      logEntry.total_tokens = response.usage?.total_tokens;
      logEntry.finish_reason = response.choices[0]?.finish_reason;

      await this.log(logEntry);
      return response;

    } catch (error) {
      logEntry.success = false;
      logEntry.error = error.message;
      logEntry.error_type = error.constructor.name;
      await this.log(logEntry);
      throw error;
    }
  }
}

// Usage
const logger = new OpenAILogger();
const response = await logger.chat([
  { role: 'user', content: 'Hello!' }
], { model: 'gpt-4' });

Querying OpenAI Logs

-- Token usage over time
SELECT 
  DATE_TRUNC('hour', timestamp) as hour,
  SUM(total_tokens) as total_tokens,
  SUM(prompt_tokens) as prompt_tokens,
  SUM(completion_tokens) as completion_tokens,
  COUNT(*) as request_count
FROM "openai-logs"
WHERE timestamp > NOW() - INTERVAL '24 hours'
GROUP BY hour
ORDER BY hour DESC

-- Average latency by model
SELECT 
  model,
  AVG(duration_ms) as avg_latency,
  PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95_latency,
  COUNT(*) as requests
FROM "openai-logs"
WHERE success = true
GROUP BY model

-- Error rate
SELECT 
  DATE_TRUNC('hour', timestamp) as hour,
  COUNT(*) as total,
  SUM(CASE WHEN success = false THEN 1 ELSE 0 END) as errors,
  ROUND(SUM(CASE WHEN success = false THEN 1 ELSE 0 END)::float / COUNT(*) * 100, 2) as error_rate
FROM "openai-logs"
GROUP BY hour
ORDER BY hour DESC

-- Cost estimation (approximate)
SELECT 
  model,
  SUM(prompt_tokens) / 1000.0 * 0.03 as prompt_cost,
  SUM(completion_tokens) / 1000.0 * 0.06 as completion_cost,
  SUM(prompt_tokens) / 1000.0 * 0.03 + SUM(completion_tokens) / 1000.0 * 0.06 as total_cost
FROM "openai-logs"
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY model

Best Practices

  1. Hash Sensitive Data - Don't log full prompts if sensitive
  2. Track Request IDs - Correlate requests across systems
  3. Monitor Costs - Set up alerts for token usage
  4. Log Errors - Capture error details for debugging
  5. Sample High Volume - Consider sampling for high-traffic apps

Next Steps

Was this page helpful?

On this page