Temporal
Ship Temporal workflow and activity execution events to Parseable as OpenTelemetry logs and traces
The @parseable/temporal plugin is a Temporal middleware that ships workflow and activity execution events to Parseable as OpenTelemetry logs and traces. Drop it into a worker's plugins array and every workflow run produces:
- A flame-graph trace of the run, including child workflows and activity calls.
- A flat, queryable log schema with
workflow_id,activity_name,attempt,duration_ms,status,error, etc. - Custom domain events via a replay-safe
workflowEvent()helper - useful for AI agents and multi-step orchestrators that want to emit "tool called", "plan chosen", "step started" alongside Temporal's built-in lifecycle events.
Overview
The plugin captures:
- Workflow lifecycle -
started,completed,failedrecords withduration_ms,workflow_id,run_id,workflow_name. - Activity lifecycle - per-attempt
started/completed/failedrecords withactivity_name,activity_id,attempt,duration_ms. Retries produce one record per attempt. - Signals, queries, updates - inbound message records.
- Child workflows, outbound signals, continue-as-new - outbound message records (
direction: 'outbound'). - Custom user events via
workflowEvent(name, data). - OpenTelemetry trace spans -
RunWorkflow:*,StartActivity:*,RunActivity:*emitted by Temporal's officialOpenTelemetryPlugin, sanitized for OTLP-strict ingest.
All workflow-side emission is replay-safe: the plugin uses Temporal sinks configured with callDuringReplay: false, so workflow records and user events are not duplicated when Temporal replays history (worker recovery, cache eviction, manual replay). Verified by an automated test using Worker.runReplayHistory().
Prerequisites
- Node.js 20+
- A Temporal worker built on
@temporalio/workerv1.13+ (plugin API based) - A reachable Parseable instance
Installation
npm install @parseable/temporalConfiguration
import { Worker } from '@temporalio/worker';
import { ParseablePlugin } from '@parseable/temporal';
const worker = await Worker.create({
taskQueue: 'hello-world',
workflowsPath: require.resolve('./workflows'),
activities,
// highlight-start
plugins: [
new ParseablePlugin({
serviceName: 'temporal-worker',
endpoint: process.env.PARSEABLE_URL, // e.g. https://parseable.example:8000
auth: {
username: process.env.PARSEABLE_USERNAME,
password: process.env.PARSEABLE_PASSWORD,
},
// optional, defaults shown:
logs: { stream: 'temporal-logs' },
traces: { stream: 'temporal-traces' },
}),
],
// highlight-end
});
await worker.run();Logs are POSTed to ${endpoint}/v1/logs; traces to ${endpoint}/v1/traces. The two pipelines are independently configurable - pass logs: false or traces: false to disable either layer.
Options
| Option | Type | Default | Notes |
|---|---|---|---|
serviceName | string | required | Becomes the service.name resource attribute and service_name log field. |
endpoint | string | required if logs/traces enabled | Parseable host base URL. |
auth | { username, password } | required if logs/traces enabled | HTTP basic auth. |
logs | false | { stream?: string } | { stream: 'temporal-logs' } | Set to false to disable log emission. |
traces | false | { stream?: string } | { stream: 'temporal-traces' } | Set to false to disable trace emission. |
Custom domain events from workflow code
Workflows can emit replay-safe domain events via the workflowEvent helper. Each call lands as a log record with type: 'user_event', event_name, and an arbitrary serializable event_data payload.
import { workflowEvent } from '@parseable/temporal/workflow';
import { proxyActivities } from '@temporalio/workflow';
const { planActivity, runStep } = proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
});
export async function agentWorkflow(input: AgentInput): Promise<AgentResult> {
workflowEvent('agent.started', { user_id: input.userId });
const plan = await planActivity(input);
workflowEvent('agent.plan.chosen', { steps: plan.steps.length });
for (const step of plan.steps) {
workflowEvent('agent.step.start', { tool: step.tool });
await runStep(step);
}
workflowEvent('agent.completed', { total_steps: plan.steps.length });
return { ok: true };
}The events flow through the same sink as the built-in workflow lifecycle events, so they are deduplicated on replay.
Log schema
The plugin writes to one log stream (default temporal-logs). Records share a common envelope with fields specialized by type:
| Field | Type | Notes |
|---|---|---|
type | 'activity' | 'workflow' | 'user_event' | 'signal' | 'query' | 'update' | 'child_workflow' | 'continue_as_new' | discriminator |
status | 'started' | 'completed' | 'failed' | omitted on user_event |
service_name | string | from plugin config |
timestamp | ISO 8601 string | event time |
workflow_id | string | |
run_id | string | |
workflow_name | string | |
activity_name | string | activity records only |
activity_id | string | activity records only |
attempt | number | activity records only |
duration_ms | number | on completed/failed |
error | string | on failed |
direction | 'inbound' | 'outbound' | message records only |
message_name | string | message records only |
target_workflow_id | string | outbound signals/child workflows |
event_name | string | user events only |
event_data | object | user events only |
All logs and traces carry a parseable.plugin.version resource attribute so consumers can correlate behaviour with plugin releases.
Trace spans are emitted by Temporal's OpenTelemetryPlugin - see the Temporal TypeScript observability docs for the span schema.
Example queries
Failure rate per activity over the last hour:
SELECT activity_name,
COUNT(*) AS attempts,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failures,
1.0 * SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) / COUNT(*) AS failure_rate
FROM "temporal-logs"
WHERE type = 'activity'
AND status IN ('completed','failed')
AND p_timestamp > now() - INTERVAL '1 hour'
GROUP BY activity_name
ORDER BY failure_rate DESC;p95 workflow duration by workflow type:
SELECT workflow_name,
APPROX_PERCENTILE_CONT(duration_ms, 0.95) AS p95_ms
FROM "temporal-logs"
WHERE type = 'workflow'
AND status = 'completed'
GROUP BY workflow_name;Trace a single workflow run (correlate logs with the trace stream):
SELECT timestamp, type, status, activity_name, attempt, duration_ms, error
FROM "temporal-logs"
WHERE workflow_id = '<workflow_id>'
ORDER BY timestamp;Caveats
- Throw
ApplicationFailurefor graceful handler failures. Signal/update handlers that throw a plainErrorare treated by Temporal as a workflow-task failure: the task is retried until it succeeds, and the plugin emits onestarted+failedrecord pair per retry. To fail an update (or any handler) cleanly without retry storms, throwApplicationFailure.create({ message, nonRetryable: true })from@temporalio/workflow. The interceptor then records exactly onefailedevent and the error propagates to the client as an update failure rather than a task failure. child_workflowcompletion is tracked from the child, not the start RPC. The outbound interceptor wraps the result promise returned bynext(input)sostatus: 'completed'(orfailed) fires when the child actually finishes - not when the start call returns. Start-time RPC errors and run-time child failures are reported with distinctfailedrecords.- OTel SDK pinned to 1.x. Temporal's
OpenTelemetryPluginpins@opentelemetry/sdk-trace-base@^1.25.1, so the plugin rides the 1.x line until Temporal moves to 2.x. - Empty-body warning on OTLP success is benign. Parseable returns HTTP 200 with an empty body for accepted OTLP payloads. OTel's deserializer logs
Export succeeded but could not deserialize response - is the response specification compliant?only atDiagLogLevel.DEBUG.
Links
- Plugin source: github.com/parseablehq/parseable-temporal-plugin
- npm package: npmjs.com/package/@parseable/temporal
- Temporal TypeScript observability docs: docs.temporal.io/develop/typescript/observability
Was this page helpful?