Parseable

AWS Kinesis

Stream logs from AWS Kinesis to Parseable


Stream logs from AWS Kinesis Data Streams and Kinesis Data Firehose to Parseable.

Overview

Integrate AWS Kinesis with Parseable to:

  • Real-time Streaming - Ingest logs as they arrive
  • High Throughput - Handle massive log volumes
  • AWS Integration - Collect logs from AWS services
  • Buffering - Use Firehose for batched delivery

Prerequisites

  • AWS account with Kinesis access
  • Parseable instance accessible from AWS
  • IAM permissions for Kinesis

Method 1: Kinesis Data Firehose

Use Firehose for managed delivery to Parseable's HTTP endpoint.

Create Delivery Stream

  1. Go to KinesisData Firehose
  2. Click Create delivery dataset
  3. Configure source (Direct PUT or Kinesis Data Stream)
  4. Select HTTP Endpoint as destination
  5. Configure endpoint:
FieldValue
HTTP endpoint URLhttps://your-parseable/api/v1/ingest
Content encodingGZIP (recommended)
Access keyYour Parseable credentials

Configure Buffering

Buffer size: 5 MB
Buffer interval: 60 seconds

IAM Role

Firehose needs permissions to deliver to HTTP endpoint:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "firehose:PutRecord",
        "firehose:PutRecordBatch"
      ],
      "Resource": "arn:aws:firehose:*:*:deliverystream/parseable-*"
    }
  ]
}

Method 2: Lambda Consumer

Use Lambda to consume from Kinesis and forward to Parseable.

Lambda Function

// index.js
const https = require('https');

const PARSEABLE_URL = process.env.PARSEABLE_URL;
const PARSEABLE_AUTH = process.env.PARSEABLE_AUTH;
const STREAM_NAME = process.env.STREAM_NAME || 'kinesis-logs';

exports.handler = async (event) => {
  const logs = event.Records.map(record => {
    const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
    try {
      return JSON.parse(payload);
    } catch {
      return { message: payload, timestamp: new Date().toISOString() };
    }
  });

  const url = new URL(PARSEABLE_URL);
  const options = {
    hostname: url.hostname,
    port: url.port || 443,
    path: '/api/v1/ingest',
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Basic ${PARSEABLE_AUTH}`,
      'X-P-Stream': STREAM_NAME
    }
  };

  return new Promise((resolve, reject) => {
    const req = https.request(options, (res) => {
      if (res.statusCode >= 200 && res.statusCode < 300) {
        resolve({ statusCode: 200, body: 'Success' });
      } else {
        reject(new Error(`HTTP ${res.statusCode}`));
      }
    });
    req.on('error', reject);
    req.write(JSON.stringify(logs));
    req.end();
  });
};

Lambda Configuration

# serverless.yml
functions:
  kinesisConsumer:
    handler: index.handler
    events:
      - dataset:
          type: kinesis
          arn: arn:aws:kinesis:region:account:dataset/your-dataset
          batchSize: 100
          startingPosition: LATEST
    environment:
      PARSEABLE_URL: https://your-parseable.com
      PARSEABLE_AUTH: ${ssm:/parseable/auth}
      STREAM_NAME: kinesis-logs

Method 3: OpenTelemetry Collector

Use OTel Collector with Kinesis receiver.

Configuration

receivers:
  awskinesis:
    stream_name: your-kinesis-dataset
    region: us-east-1
    encoding: json

exporters:
  otlphttp/parseable:
    endpoint: "http://parseable:8000"
    headers:
      Authorization: "Basic YWRtaW46YWRtaW4="
      X-P-Stream: "kinesis-logs"
      X-P-Log-Source: "otel-logs"

service:
  pipelines:
    logs:
      receivers: [awskinesis]
      exporters: [otlphttp/parseable]

Best Practices

  1. Use Firehose - For managed, reliable delivery
  2. Enable Compression - Reduce data transfer costs
  3. Configure Retries - Handle transient failures
  4. Monitor Metrics - Track delivery success rates
  5. Set Buffer Sizes - Balance latency vs efficiency

Troubleshooting

Delivery Failures

  1. Check Parseable endpoint is accessible
  2. Verify authentication credentials
  3. Check CloudWatch logs for errors
  4. Verify IAM permissions

Data Loss

  1. Enable S3 backup in Firehose
  2. Configure dead letter queue
  3. Monitor iterator age

Next Steps

Was this page helpful?

On this page