Parseable

RabbitMQ

Stream logs from RabbitMQ to Parseable


Stream logs from RabbitMQ message queues to Parseable.

Overview

Integrate RabbitMQ with Parseable to:

  • Message Queue Logs - Collect logs from RabbitMQ queues
  • Application Events - Stream application events via RabbitMQ
  • Decoupled Architecture - Buffer logs through message queues
  • Reliable Delivery - Leverage RabbitMQ's delivery guarantees

Prerequisites

  • RabbitMQ server running
  • Parseable instance accessible
  • Consumer application or Fluent Bit

Method 1: Fluent Bit Consumer

Use Fluent Bit to consume messages from RabbitMQ.

Fluent Bit Configuration

service:
  flush: 5
  log_level: info

pipeline:
  inputs:
    - name: rabbitmq
      host: rabbitmq
      port: 5672
      user: guest
      password: guest
      queue: logs
      vhost: /

  outputs:
    - name: http
      match: '*'
      host: parseable
      port: 8000
      uri: /api/v1/ingest
      format: json
      header: Authorization Basic YWRtaW46YWRtaW4=
      header: X-P-Stream rabbitmq-logs

Method 2: Custom Consumer

Build a consumer application to forward messages to Parseable.

Node.js Consumer

const amqp = require('amqplib');
const axios = require('axios');

const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://localhost';
const QUEUE_NAME = process.env.QUEUE_NAME || 'logs';
const PARSEABLE_URL = process.env.PARSEABLE_URL;
const PARSEABLE_AUTH = process.env.PARSEABLE_AUTH;

async function consume() {
  const connection = await amqp.connect(RABBITMQ_URL);
  const channel = await connection.createChannel();
  
  await channel.assertQueue(QUEUE_NAME, { durable: true });
  
  const batch = [];
  const BATCH_SIZE = 100;
  const BATCH_TIMEOUT = 5000;
  
  let timeout = null;
  
  const sendBatch = async () => {
    if (batch.length === 0) return;
    
    const logs = batch.splice(0, batch.length);
    
    try {
      await axios.post(`${PARSEABLE_URL}/api/v1/ingest`, logs, {
        headers: {
          'Authorization': `Basic ${PARSEABLE_AUTH}`,
          'X-P-Stream': 'rabbitmq-logs',
          'Content-Type': 'application/json'
        }
      });
      console.log(`Sent ${logs.length} logs to Parseable`);
    } catch (error) {
      console.error('Error sending to Parseable:', error.message);
      // Re-queue failed messages
      logs.forEach(log => batch.push(log));
    }
  };
  
  channel.consume(QUEUE_NAME, async (msg) => {
    if (msg) {
      try {
        const log = JSON.parse(msg.content.toString());
        log.timestamp = log.timestamp || new Date().toISOString();
        batch.push(log);
        channel.ack(msg);
        
        if (batch.length >= BATCH_SIZE) {
          clearTimeout(timeout);
          await sendBatch();
        } else if (!timeout) {
          timeout = setTimeout(async () => {
            await sendBatch();
            timeout = null;
          }, BATCH_TIMEOUT);
        }
      } catch (error) {
        console.error('Error processing message:', error);
        channel.nack(msg, false, false);
      }
    }
  });
  
  console.log(`Consuming from queue: ${QUEUE_NAME}`);
}

consume().catch(console.error);

Python Consumer

import pika
import requests
import json
import os
from datetime import datetime

RABBITMQ_URL = os.getenv('RABBITMQ_URL', 'localhost')
QUEUE_NAME = os.getenv('QUEUE_NAME', 'logs')
PARSEABLE_URL = os.getenv('PARSEABLE_URL')
PARSEABLE_AUTH = os.getenv('PARSEABLE_AUTH')

def send_to_parseable(logs):
    headers = {
        'Authorization': f'Basic {PARSEABLE_AUTH}',
        'X-P-Stream': 'rabbitmq-logs',
        'Content-Type': 'application/json'
    }
    response = requests.post(
        f'{PARSEABLE_URL}/api/v1/ingest',
        json=logs,
        headers=headers
    )
    response.raise_for_status()

def callback(ch, method, properties, body):
    try:
        log = json.loads(body)
        log['timestamp'] = log.get('timestamp', datetime.utcnow().isoformat() + 'Z')
        send_to_parseable([log])
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f'Error: {e}')
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback)
print(f'Consuming from queue: {QUEUE_NAME}')
channel.start_consuming()

Docker Compose

version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
  
  consumer:
    build: .
    environment:
      - RABBITMQ_URL=amqp://rabbitmq
      - QUEUE_NAME=logs
      - PARSEABLE_URL=http://parseable:8000
      - PARSEABLE_AUTH=YWRtaW46YWRtaW4=
    depends_on:
      - rabbitmq
      - parseable

Best Practices

  1. Use Batching - Batch messages for efficiency
  2. Handle Failures - Implement retry logic
  3. Monitor Queues - Watch queue depth
  4. Use Dead Letter - Configure DLX for failed messages
  5. Acknowledge Properly - Ack after successful send

Troubleshooting

Messages Not Consumed

  1. Verify RabbitMQ connection
  2. Check queue exists
  3. Verify consumer is running
  4. Check RabbitMQ management UI

Delivery Failures

  1. Check Parseable endpoint
  2. Verify authentication
  3. Check network connectivity

Next Steps

Was this page helpful?

On this page