Parseable

Temporal

Ship Temporal workflow and activity execution events to Parseable as OpenTelemetry logs and traces


The @parseable/temporal plugin is a Temporal middleware that ships workflow and activity execution events to Parseable as OpenTelemetry logs and traces. Drop it into a worker's plugins array and every workflow run produces:

  • A flame-graph trace of the run, including child workflows and activity calls.
  • A flat, queryable log schema with workflow_id, activity_name, attempt, duration_ms, status, error, etc.
  • Custom domain events via a replay-safe workflowEvent() helper - useful for AI agents and multi-step orchestrators that want to emit "tool called", "plan chosen", "step started" alongside Temporal's built-in lifecycle events.

Overview

The plugin captures:

  • Workflow lifecycle - started, completed, failed records with duration_ms, workflow_id, run_id, workflow_name.
  • Activity lifecycle - per-attempt started/completed/failed records with activity_name, activity_id, attempt, duration_ms. Retries produce one record per attempt.
  • Signals, queries, updates - inbound message records.
  • Child workflows, outbound signals, continue-as-new - outbound message records (direction: 'outbound').
  • Custom user events via workflowEvent(name, data).
  • OpenTelemetry trace spans - RunWorkflow:*, StartActivity:*, RunActivity:* emitted by Temporal's official OpenTelemetryPlugin, sanitized for OTLP-strict ingest.

All workflow-side emission is replay-safe: the plugin uses Temporal sinks configured with callDuringReplay: false, so workflow records and user events are not duplicated when Temporal replays history (worker recovery, cache eviction, manual replay). Verified by an automated test using Worker.runReplayHistory().

Prerequisites

  • Node.js 20+
  • A Temporal worker built on @temporalio/worker v1.13+ (plugin API based)
  • A reachable Parseable instance

Installation

npm install @parseable/temporal

Configuration

import { Worker } from '@temporalio/worker';
import { ParseablePlugin } from '@parseable/temporal';

const worker = await Worker.create({
  taskQueue: 'hello-world',
  workflowsPath: require.resolve('./workflows'),
  activities,
  // highlight-start
  plugins: [
    new ParseablePlugin({
      serviceName: 'temporal-worker',
      endpoint: process.env.PARSEABLE_URL, // e.g. https://parseable.example:8000
      auth: {
        username: process.env.PARSEABLE_USERNAME,
        password: process.env.PARSEABLE_PASSWORD,
      },
      // optional, defaults shown:
      logs: { stream: 'temporal-logs' },
      traces: { stream: 'temporal-traces' },
    }),
  ],
  // highlight-end
});

await worker.run();

Logs are POSTed to ${endpoint}/v1/logs; traces to ${endpoint}/v1/traces. The two pipelines are independently configurable - pass logs: false or traces: false to disable either layer.

Options

OptionTypeDefaultNotes
serviceNamestringrequiredBecomes the service.name resource attribute and service_name log field.
endpointstringrequired if logs/traces enabledParseable host base URL.
auth{ username, password }required if logs/traces enabledHTTP basic auth.
logsfalse | { stream?: string }{ stream: 'temporal-logs' }Set to false to disable log emission.
tracesfalse | { stream?: string }{ stream: 'temporal-traces' }Set to false to disable trace emission.

Custom domain events from workflow code

Workflows can emit replay-safe domain events via the workflowEvent helper. Each call lands as a log record with type: 'user_event', event_name, and an arbitrary serializable event_data payload.

import { workflowEvent } from '@parseable/temporal/workflow';
import { proxyActivities } from '@temporalio/workflow';

const { planActivity, runStep } = proxyActivities<typeof activities>({
  startToCloseTimeout: '1 minute',
});

export async function agentWorkflow(input: AgentInput): Promise<AgentResult> {
  workflowEvent('agent.started', { user_id: input.userId });

  const plan = await planActivity(input);
  workflowEvent('agent.plan.chosen', { steps: plan.steps.length });

  for (const step of plan.steps) {
    workflowEvent('agent.step.start', { tool: step.tool });
    await runStep(step);
  }

  workflowEvent('agent.completed', { total_steps: plan.steps.length });
  return { ok: true };
}

The events flow through the same sink as the built-in workflow lifecycle events, so they are deduplicated on replay.

Log schema

The plugin writes to one log stream (default temporal-logs). Records share a common envelope with fields specialized by type:

FieldTypeNotes
type'activity' | 'workflow' | 'user_event' | 'signal' | 'query' | 'update' | 'child_workflow' | 'continue_as_new'discriminator
status'started' | 'completed' | 'failed'omitted on user_event
service_namestringfrom plugin config
timestampISO 8601 stringevent time
workflow_idstring
run_idstring
workflow_namestring
activity_namestringactivity records only
activity_idstringactivity records only
attemptnumberactivity records only
duration_msnumberon completed/failed
errorstringon failed
direction'inbound' | 'outbound'message records only
message_namestringmessage records only
target_workflow_idstringoutbound signals/child workflows
event_namestringuser events only
event_dataobjectuser events only

All logs and traces carry a parseable.plugin.version resource attribute so consumers can correlate behaviour with plugin releases.

Trace spans are emitted by Temporal's OpenTelemetryPlugin - see the Temporal TypeScript observability docs for the span schema.

Example queries

Failure rate per activity over the last hour:

SELECT activity_name,
       COUNT(*)                                          AS attempts,
       SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failures,
       1.0 * SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) / COUNT(*) AS failure_rate
FROM "temporal-logs"
WHERE type = 'activity'
  AND status IN ('completed','failed')
  AND p_timestamp > now() - INTERVAL '1 hour'
GROUP BY activity_name
ORDER BY failure_rate DESC;

p95 workflow duration by workflow type:

SELECT workflow_name,
       APPROX_PERCENTILE_CONT(duration_ms, 0.95) AS p95_ms
FROM "temporal-logs"
WHERE type = 'workflow'
  AND status = 'completed'
GROUP BY workflow_name;

Trace a single workflow run (correlate logs with the trace stream):

SELECT timestamp, type, status, activity_name, attempt, duration_ms, error
FROM "temporal-logs"
WHERE workflow_id = '<workflow_id>'
ORDER BY timestamp;

Caveats

  • Throw ApplicationFailure for graceful handler failures. Signal/update handlers that throw a plain Error are treated by Temporal as a workflow-task failure: the task is retried until it succeeds, and the plugin emits one started+failed record pair per retry. To fail an update (or any handler) cleanly without retry storms, throw ApplicationFailure.create({ message, nonRetryable: true }) from @temporalio/workflow. The interceptor then records exactly one failed event and the error propagates to the client as an update failure rather than a task failure.
  • child_workflow completion is tracked from the child, not the start RPC. The outbound interceptor wraps the result promise returned by next(input) so status: 'completed' (or failed) fires when the child actually finishes - not when the start call returns. Start-time RPC errors and run-time child failures are reported with distinct failed records.
  • OTel SDK pinned to 1.x. Temporal's OpenTelemetryPlugin pins @opentelemetry/sdk-trace-base@^1.25.1, so the plugin rides the 1.x line until Temporal moves to 2.x.
  • Empty-body warning on OTLP success is benign. Parseable returns HTTP 200 with an empty body for accepted OTLP payloads. OTel's deserializer logs Export succeeded but could not deserialize response - is the response specification compliant? only at DiagLogLevel.DEBUG.

Was this page helpful?

On this page