Parseable

Python

Install and configure the temporal-parseable plugin for Temporal Python workers


The temporal-parseable plugin is a Temporal middleware that ships workflow and activity execution events to Parseable. See the Temporal overview for the full schema, example queries, and shared caveats.

Prerequisites

  • Python 3.9+
  • A Temporal worker built on temporalio with plugin support
  • A reachable Parseable instance

Installation

pip install temporal-parseable

Quick start

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from temporal_parseable import ParseablePlugin, ParseableConfig

config = ParseableConfig(
    service_name="my-worker",
    endpoint="https://parseable.example.com",
    username="admin",
    password="secret",
)
plugin = ParseablePlugin(config)

client = await Client.connect("localhost:7233", plugins=[plugin])

async with Worker(
    client,
    task_queue="my-queue",
    workflows=[MyWorkflow],
    activities=[my_activity],
    plugins=[plugin],
):
    await asyncio.Event().wait()

Register the plugin on both the Client and the Worker. The client-side registration installs trace context propagation; the worker-side registration installs the interceptors and emitter.

Configuration

All settings fall back to environment variables with the PARSEABLE_ prefix:

ArgumentEnvironment variableDefault
endpointPARSEABLE_URLhttp://localhost:8000
usernamePARSEABLE_USERNAMEadmin
passwordPARSEABLE_PASSWORDadmin
service_namePARSEABLE_SERVICE_NAMEtemporal-worker
logs.streamPARSEABLE_LOGS_STREAMtemporal-logs
traces.streamPARSEABLE_TRACES_STREAMtemporal-traces

Pass logs=None or traces=None to disable either pipeline.

Custom domain events from workflow code

Emit replay-safe domain events from inside workflow code:

from temporalio import workflow
from temporal_parseable.workflow import workflow_event

@workflow.defn
class AgentWorkflow:
    @workflow.run
    async def run(self, input: AgentInput) -> AgentResult:
        workflow_event("agent.started", {"user_id": input.user_id})

        plan = await workflow.execute_activity(
            plan_activity, input, start_to_close_timeout=timedelta(minutes=1)
        )
        workflow_event("agent.plan.chosen", {"steps": len(plan.steps)})

        for step in plan.steps:
            workflow_event("agent.step.start", {"tool": step.tool})
            await workflow.execute_activity(
                run_step, step, start_to_close_timeout=timedelta(minutes=1)
            )

        workflow_event("agent.completed", {"total_steps": len(plan.steps)})
        return AgentResult(ok=True)

Each call emits a record with type: "user_event", event_name, and event_data. Records are replay-safe - never duplicated during Temporal history replay (guarded with workflow.unsafe.is_replaying()).

Sandbox passthrough

temporal_parseable must be declared as a passthrough module in the SandboxedWorkflowRunner so the workflow isolate does not try to import OTel / HTTP libraries inside the sandbox. The plugin handles this automatically when registered, but if you build a custom runner pass temporal_parseable (and temporal_parseable.workflow) explicitly.

Python-specific notes

  • SanitizingSpanExporter flattens Temporal's non-primitive span attributes (nested dicts, datetime, None) before OTLP serialization. Without it, Parseable's strict OTLP parser rejects payloads with 400 Invalid data for Value. Wired automatically.
  • X-P-Log-Source headers are set automatically: otel-logs for the log pipeline and otel-traces for traces, as required by Parseable's OTLP ingestor.
  • OTel pinning. opentelemetry-sdk>=1.25,<2 because Temporal's SDK rides the 1.x line.

Was this page helpful?

On this page