AWS Kinesis
Stream logs from AWS Kinesis to Parseable
Stream logs from AWS Kinesis Data Streams and Kinesis Data Firehose to Parseable.
Overview
Integrate AWS Kinesis with Parseable to:
- Real-time Streaming - Ingest logs as they arrive
- High Throughput - Handle massive log volumes
- AWS Integration - Collect logs from AWS services
- Buffering - Use Firehose for batched delivery
Prerequisites
- AWS account with Kinesis access
- Parseable instance accessible from AWS
- IAM permissions for Kinesis
Method 1: Kinesis Data Firehose
Use Firehose for managed delivery to Parseable's HTTP endpoint.
Create Delivery Stream
- Go to Kinesis → Data Firehose
- Click Create delivery dataset
- Configure source (Direct PUT or Kinesis Data Stream)
- Select HTTP Endpoint as destination
- Configure endpoint:
| Field | Value |
|---|---|
| HTTP endpoint URL | https://your-parseable/api/v1/ingest |
| Content encoding | GZIP (recommended) |
| Access key | Your Parseable credentials |
Configure Buffering
Buffer size: 5 MB
Buffer interval: 60 secondsIAM Role
Firehose needs permissions to deliver to HTTP endpoint:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"firehose:PutRecord",
"firehose:PutRecordBatch"
],
"Resource": "arn:aws:firehose:*:*:deliverystream/parseable-*"
}
]
}Method 2: Lambda Consumer
Use Lambda to consume from Kinesis and forward to Parseable.
Lambda Function
// index.js
const https = require('https');
const PARSEABLE_URL = process.env.PARSEABLE_URL;
const PARSEABLE_AUTH = process.env.PARSEABLE_AUTH;
const STREAM_NAME = process.env.STREAM_NAME || 'kinesis-logs';
exports.handler = async (event) => {
const logs = event.Records.map(record => {
const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
try {
return JSON.parse(payload);
} catch {
return { message: payload, timestamp: new Date().toISOString() };
}
});
const url = new URL(PARSEABLE_URL);
const options = {
hostname: url.hostname,
port: url.port || 443,
path: '/api/v1/ingest',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Basic ${PARSEABLE_AUTH}`,
'X-P-Stream': STREAM_NAME
}
};
return new Promise((resolve, reject) => {
const req = https.request(options, (res) => {
if (res.statusCode >= 200 && res.statusCode < 300) {
resolve({ statusCode: 200, body: 'Success' });
} else {
reject(new Error(`HTTP ${res.statusCode}`));
}
});
req.on('error', reject);
req.write(JSON.stringify(logs));
req.end();
});
};Lambda Configuration
# serverless.yml
functions:
kinesisConsumer:
handler: index.handler
events:
- dataset:
type: kinesis
arn: arn:aws:kinesis:region:account:dataset/your-dataset
batchSize: 100
startingPosition: LATEST
environment:
PARSEABLE_URL: https://your-parseable.com
PARSEABLE_AUTH: ${ssm:/parseable/auth}
STREAM_NAME: kinesis-logsMethod 3: OpenTelemetry Collector
Use OTel Collector with Kinesis receiver.
Configuration
receivers:
awskinesis:
stream_name: your-kinesis-dataset
region: us-east-1
encoding: json
exporters:
otlphttp/parseable:
endpoint: "http://parseable:8000"
headers:
Authorization: "Basic YWRtaW46YWRtaW4="
X-P-Stream: "kinesis-logs"
X-P-Log-Source: "otel-logs"
service:
pipelines:
logs:
receivers: [awskinesis]
exporters: [otlphttp/parseable]Best Practices
- Use Firehose - For managed, reliable delivery
- Enable Compression - Reduce data transfer costs
- Configure Retries - Handle transient failures
- Monitor Metrics - Track delivery success rates
- Set Buffer Sizes - Balance latency vs efficiency
Troubleshooting
Delivery Failures
- Check Parseable endpoint is accessible
- Verify authentication credentials
- Check CloudWatch logs for errors
- Verify IAM permissions
Data Loss
- Enable S3 backup in Firehose
- Configure dead letter queue
- Monitor iterator age
Next Steps
- Set up AWS CloudWatch for CloudWatch logs
- Configure alerts for streaming metrics
- Create dashboards for monitoring
Was this page helpful?