Parseable

SIEM Export

Export logs from Parseable to SIEM platforms


Export logs from Parseable to Security Information and Event Management (SIEM) platforms.

Overview

Export Parseable logs to SIEM platforms for:

  • Security Analysis - Correlate with other security data
  • Compliance - Meet regulatory requirements
  • Threat Detection - Leverage SIEM detection rules
  • Incident Response - Unified security view

Supported SIEM Platforms

  • Splunk
  • IBM QRadar
  • Microsoft Sentinel
  • Elastic Security
  • Sumo Logic
  • LogRhythm

Method 1: Scheduled Export

Export logs on a schedule using scripts or automation.

Export Script

#!/usr/bin/env python3
import requests
import json
from datetime import datetime, timedelta
import os

PARSEABLE_URL = os.getenv('PARSEABLE_URL')
PARSEABLE_AUTH = (os.getenv('PARSEABLE_USER'), os.getenv('PARSEABLE_PASS'))
SIEM_URL = os.getenv('SIEM_URL')
SIEM_TOKEN = os.getenv('SIEM_TOKEN')

def export_to_siem(dataset, start_time, end_time):
    # Query Parseable using the Query API
    response = requests.post(
        f"{PARSEABLE_URL}/api/v1/query",
        auth=PARSEABLE_AUTH,
        json={
            "query": f"SELECT * FROM {dataset}",
            "startTime": start_time,
            "endTime": end_time,
            "streamName": dataset
        }
    )
    
    logs = response.json()
    
    if not logs:
        return 0
    
    # Transform for SIEM
    siem_events = []
    for log in logs:
        event = {
            "timestamp": log.get('p_timestamp'),
            "source": "parseable",
            "dataset": dataset,
            "event": log
        }
        siem_events.append(event)
    
    # Send to SIEM (Splunk HEC example)
    siem_response = requests.post(
        f"{SIEM_URL}/services/collector/event",
        headers={
            "Authorization": f"Splunk {SIEM_TOKEN}",
            "Content-Type": "application/json"
        },
        json={"event": siem_events}
    )
    
    return len(siem_events)

# Export last hour
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)

datasets = ['security-logs', 'audit-logs', 'access-logs']
for dataset in datasets:
    count = export_to_siem(
        dataset,
        start_time.strftime('%Y-%m-%d %H:%M:%S.%f'),
        end_time.strftime('%Y-%m-%d %H:%M:%S.%f')
    )
    print(f"Exported {count} events from {dataset}")

Kubernetes CronJob

apiVersion: batch/v1
kind: CronJob
metadata:
  name: siem-export
spec:
  schedule: "0 * * * *"  # Every hour
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: exporter
              image: python:3.11-slim
              command: ["python", "/scripts/export.py"]
              env:
                - name: PARSEABLE_URL
                  value: "http://parseable:8000"
                - name: PARSEABLE_USER
                  valueFrom:
                    secretKeyRef:
                      name: parseable-creds
                      key: username
                - name: PARSEABLE_PASS
                  valueFrom:
                    secretKeyRef:
                      name: parseable-creds
                      key: password
                - name: SIEM_URL
                  value: "https://siem.example.com"
                - name: SIEM_TOKEN
                  valueFrom:
                    secretKeyRef:
                      name: siem-creds
                      key: token
              volumeMounts:
                - name: scripts
                  mountPath: /scripts
          volumes:
            - name: scripts
              configMap:
                name: siem-export-script
          restartPolicy: OnFailure

Method 2: Real-time Streaming

Stream logs in real-time using Fluent Bit.

Fluent Bit Configuration

service:
  flush: 5
  log_level: info

pipeline:
  inputs:
    - name: http
      listen: 0.0.0.0
      port: 8888
      tag: parseable

  outputs:
    # Splunk HEC
    - name: splunk
      match: '*'
      host: splunk.example.com
      port: 8088
      splunk_token: ${SPLUNK_TOKEN}
      tls: On
      tls.verify: Off

    # Elastic
    - name: es
      match: '*'
      host: elastic.example.com
      port: 9200
      index: security-logs
      http_user: ${ES_USER}
      http_passwd: ${ES_PASS}

Configure Parseable Webhook

Set up Parseable alerts to forward to Fluent Bit:

{
  "name": "SIEM Forward",
  "dataset": "security-logs",
  "alertType": "threshold",
  "condition": {
    "field": "level",
    "operator": "in",
    "value": ["error", "critical", "security"]
  },
  "threshold": 1,
  "duration": "1m",
  "webhook": {
    "url": "http://fluent-bit:8888",
    "method": "POST"
  }
}

SIEM-Specific Configurations

Splunk

def send_to_splunk(events):
    # Splunk HEC expects individual events or batch format
    for event in events:
        payload = {
            "time": event.get('timestamp'),
            "host": "parseable",
            "source": "parseable",
            "sourcetype": "_json",
            "event": event
        }
        
        requests.post(
            f"{SPLUNK_URL}/services/collector/event",
            headers={"Authorization": f"Splunk {SPLUNK_TOKEN}"},
            json=payload
        )

# For batch sending (more efficient for high volume)
def send_to_splunk_batch(events):
    # Splunk HEC batch format: newline-delimited JSON
    batch_payload = "\n".join([
        json.dumps({
            "time": event.get('timestamp'),
            "host": "parseable",
            "source": "parseable",
            "sourcetype": "_json",
            "event": event
        }) for event in events
    ])
    
    requests.post(
        f"{SPLUNK_URL}/services/collector/event",
        headers={"Authorization": f"Splunk {SPLUNK_TOKEN}"},
        data=batch_payload
    )

Microsoft Sentinel

import hashlib
import hmac
import base64

def send_to_sentinel(events):
    workspace_id = os.getenv('SENTINEL_WORKSPACE_ID')
    shared_key = os.getenv('SENTINEL_SHARED_KEY')
    log_type = 'ParseableLogs'
    
    body = json.dumps(events)
    
    # Build signature
    date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
    content_length = len(body)
    string_to_hash = f"POST\n{content_length}\napplication/json\nx-ms-date:{date}\n/api/logs"
    
    decoded_key = base64.b64decode(shared_key)
    encoded_hash = base64.b64encode(
        hmac.new(decoded_key, string_to_hash.encode('utf-8'), hashlib.sha256).digest()
    ).decode()
    
    signature = f"SharedKey {workspace_id}:{encoded_hash}"
    
    requests.post(
        f"https://{workspace_id}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01",
        headers={
            "Authorization": signature,
            "Content-Type": "application/json",
            "Log-Type": log_type,
            "x-ms-date": date
        },
        data=body
    )

IBM QRadar

def send_to_qradar(events):
    for event in events:
        # Format as syslog
        syslog_msg = f"<14>{event.get('timestamp')} parseable {json.dumps(event)}"
        
        # Send via syslog
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(syslog_msg.encode(), (QRADAR_HOST, 514))

Data Mapping

Map Parseable fields to SIEM fields:

ParseableSplunkSentinelQRadar
p_timestamp_timeTimeGenerateddeviceTime
levelseveritySeverityLevelseverity
message_rawMessagepayload
sourcesourceSourceSystemlogSourceId

Best Practices

  1. Filter First - Only export security-relevant logs
  2. Normalize Data - Map to SIEM schema
  3. Handle Failures - Implement retry logic
  4. Monitor Lag - Track export latency
  5. Deduplicate - Prevent duplicate events

Troubleshooting

Export Failures

  1. Check SIEM connectivity
  2. Verify authentication tokens
  3. Check rate limits
  4. Verify data format

Missing Events

  1. Check time range queries
  2. Verify dataset names
  3. Check export schedule

Next Steps

  • Set up alerts for security events
  • Configure Falco for runtime security
  • Create dashboards for security monitoring

Was this page helpful?

On this page