SIEM Export
Export logs from Parseable to SIEM platforms
Export logs from Parseable to Security Information and Event Management (SIEM) platforms.
Overview
Export Parseable logs to SIEM platforms for:
- Security Analysis - Correlate with other security data
- Compliance - Meet regulatory requirements
- Threat Detection - Leverage SIEM detection rules
- Incident Response - Unified security view
Supported SIEM Platforms
- Splunk
- IBM QRadar
- Microsoft Sentinel
- Elastic Security
- Sumo Logic
- LogRhythm
Method 1: Scheduled Export
Export logs on a schedule using scripts or automation.
Export Script
#!/usr/bin/env python3
import requests
import json
from datetime import datetime, timedelta
import os
PARSEABLE_URL = os.getenv('PARSEABLE_URL')
PARSEABLE_AUTH = (os.getenv('PARSEABLE_USER'), os.getenv('PARSEABLE_PASS'))
SIEM_URL = os.getenv('SIEM_URL')
SIEM_TOKEN = os.getenv('SIEM_TOKEN')
def export_to_siem(dataset, start_time, end_time):
# Query Parseable using the Query API
response = requests.post(
f"{PARSEABLE_URL}/api/v1/query",
auth=PARSEABLE_AUTH,
json={
"query": f"SELECT * FROM {dataset}",
"startTime": start_time,
"endTime": end_time,
"streamName": dataset
}
)
logs = response.json()
if not logs:
return 0
# Transform for SIEM
siem_events = []
for log in logs:
event = {
"timestamp": log.get('p_timestamp'),
"source": "parseable",
"dataset": dataset,
"event": log
}
siem_events.append(event)
# Send to SIEM (Splunk HEC example)
siem_response = requests.post(
f"{SIEM_URL}/services/collector/event",
headers={
"Authorization": f"Splunk {SIEM_TOKEN}",
"Content-Type": "application/json"
},
json={"event": siem_events}
)
return len(siem_events)
# Export last hour
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)
datasets = ['security-logs', 'audit-logs', 'access-logs']
for dataset in datasets:
count = export_to_siem(
dataset,
start_time.strftime('%Y-%m-%d %H:%M:%S.%f'),
end_time.strftime('%Y-%m-%d %H:%M:%S.%f')
)
print(f"Exported {count} events from {dataset}")Kubernetes CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
name: siem-export
spec:
schedule: "0 * * * *" # Every hour
jobTemplate:
spec:
template:
spec:
containers:
- name: exporter
image: python:3.11-slim
command: ["python", "/scripts/export.py"]
env:
- name: PARSEABLE_URL
value: "http://parseable:8000"
- name: PARSEABLE_USER
valueFrom:
secretKeyRef:
name: parseable-creds
key: username
- name: PARSEABLE_PASS
valueFrom:
secretKeyRef:
name: parseable-creds
key: password
- name: SIEM_URL
value: "https://siem.example.com"
- name: SIEM_TOKEN
valueFrom:
secretKeyRef:
name: siem-creds
key: token
volumeMounts:
- name: scripts
mountPath: /scripts
volumes:
- name: scripts
configMap:
name: siem-export-script
restartPolicy: OnFailureMethod 2: Real-time Streaming
Stream logs in real-time using Fluent Bit.
Fluent Bit Configuration
service:
flush: 5
log_level: info
pipeline:
inputs:
- name: http
listen: 0.0.0.0
port: 8888
tag: parseable
outputs:
# Splunk HEC
- name: splunk
match: '*'
host: splunk.example.com
port: 8088
splunk_token: ${SPLUNK_TOKEN}
tls: On
tls.verify: Off
# Elastic
- name: es
match: '*'
host: elastic.example.com
port: 9200
index: security-logs
http_user: ${ES_USER}
http_passwd: ${ES_PASS}Configure Parseable Webhook
Set up Parseable alerts to forward to Fluent Bit:
{
"name": "SIEM Forward",
"dataset": "security-logs",
"alertType": "threshold",
"condition": {
"field": "level",
"operator": "in",
"value": ["error", "critical", "security"]
},
"threshold": 1,
"duration": "1m",
"webhook": {
"url": "http://fluent-bit:8888",
"method": "POST"
}
}SIEM-Specific Configurations
Splunk
def send_to_splunk(events):
# Splunk HEC expects individual events or batch format
for event in events:
payload = {
"time": event.get('timestamp'),
"host": "parseable",
"source": "parseable",
"sourcetype": "_json",
"event": event
}
requests.post(
f"{SPLUNK_URL}/services/collector/event",
headers={"Authorization": f"Splunk {SPLUNK_TOKEN}"},
json=payload
)
# For batch sending (more efficient for high volume)
def send_to_splunk_batch(events):
# Splunk HEC batch format: newline-delimited JSON
batch_payload = "\n".join([
json.dumps({
"time": event.get('timestamp'),
"host": "parseable",
"source": "parseable",
"sourcetype": "_json",
"event": event
}) for event in events
])
requests.post(
f"{SPLUNK_URL}/services/collector/event",
headers={"Authorization": f"Splunk {SPLUNK_TOKEN}"},
data=batch_payload
)Microsoft Sentinel
import hashlib
import hmac
import base64
def send_to_sentinel(events):
workspace_id = os.getenv('SENTINEL_WORKSPACE_ID')
shared_key = os.getenv('SENTINEL_SHARED_KEY')
log_type = 'ParseableLogs'
body = json.dumps(events)
# Build signature
date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
content_length = len(body)
string_to_hash = f"POST\n{content_length}\napplication/json\nx-ms-date:{date}\n/api/logs"
decoded_key = base64.b64decode(shared_key)
encoded_hash = base64.b64encode(
hmac.new(decoded_key, string_to_hash.encode('utf-8'), hashlib.sha256).digest()
).decode()
signature = f"SharedKey {workspace_id}:{encoded_hash}"
requests.post(
f"https://{workspace_id}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01",
headers={
"Authorization": signature,
"Content-Type": "application/json",
"Log-Type": log_type,
"x-ms-date": date
},
data=body
)IBM QRadar
def send_to_qradar(events):
for event in events:
# Format as syslog
syslog_msg = f"<14>{event.get('timestamp')} parseable {json.dumps(event)}"
# Send via syslog
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(syslog_msg.encode(), (QRADAR_HOST, 514))Data Mapping
Map Parseable fields to SIEM fields:
| Parseable | Splunk | Sentinel | QRadar |
|---|---|---|---|
p_timestamp | _time | TimeGenerated | deviceTime |
level | severity | SeverityLevel | severity |
message | _raw | Message | payload |
source | source | SourceSystem | logSourceId |
Best Practices
- Filter First - Only export security-relevant logs
- Normalize Data - Map to SIEM schema
- Handle Failures - Implement retry logic
- Monitor Lag - Track export latency
- Deduplicate - Prevent duplicate events
Troubleshooting
Export Failures
- Check SIEM connectivity
- Verify authentication tokens
- Check rate limits
- Verify data format
Missing Events
- Check time range queries
- Verify dataset names
- Check export schedule
Next Steps
- Set up alerts for security events
- Configure Falco for runtime security
- Create dashboards for security monitoring
Was this page helpful?