RabbitMQ
Stream logs from RabbitMQ to Parseable
Stream logs from RabbitMQ message queues to Parseable.
Overview
Integrate RabbitMQ with Parseable to:
- Message Queue Logs - Collect logs from RabbitMQ queues
- Application Events - Stream application events via RabbitMQ
- Decoupled Architecture - Buffer logs through message queues
- Reliable Delivery - Leverage RabbitMQ's delivery guarantees
Prerequisites
- RabbitMQ server running
- Parseable instance accessible
- Consumer application or Fluent Bit
Method 1: Fluent Bit Consumer
Use Fluent Bit to consume messages from RabbitMQ.
Fluent Bit Configuration
service:
flush: 5
log_level: info
pipeline:
inputs:
- name: rabbitmq
host: rabbitmq
port: 5672
user: guest
password: guest
queue: logs
vhost: /
outputs:
- name: http
match: '*'
host: parseable
port: 8000
uri: /api/v1/ingest
format: json
header: Authorization Basic YWRtaW46YWRtaW4=
header: X-P-Stream rabbitmq-logsMethod 2: Custom Consumer
Build a consumer application to forward messages to Parseable.
Node.js Consumer
const amqp = require('amqplib');
const axios = require('axios');
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://localhost';
const QUEUE_NAME = process.env.QUEUE_NAME || 'logs';
const PARSEABLE_URL = process.env.PARSEABLE_URL;
const PARSEABLE_AUTH = process.env.PARSEABLE_AUTH;
async function consume() {
const connection = await amqp.connect(RABBITMQ_URL);
const channel = await connection.createChannel();
await channel.assertQueue(QUEUE_NAME, { durable: true });
const batch = [];
const BATCH_SIZE = 100;
const BATCH_TIMEOUT = 5000;
let timeout = null;
const sendBatch = async () => {
if (batch.length === 0) return;
const logs = batch.splice(0, batch.length);
try {
await axios.post(`${PARSEABLE_URL}/api/v1/ingest`, logs, {
headers: {
'Authorization': `Basic ${PARSEABLE_AUTH}`,
'X-P-Stream': 'rabbitmq-logs',
'Content-Type': 'application/json'
}
});
console.log(`Sent ${logs.length} logs to Parseable`);
} catch (error) {
console.error('Error sending to Parseable:', error.message);
// Re-queue failed messages
logs.forEach(log => batch.push(log));
}
};
channel.consume(QUEUE_NAME, async (msg) => {
if (msg) {
try {
const log = JSON.parse(msg.content.toString());
log.timestamp = log.timestamp || new Date().toISOString();
batch.push(log);
channel.ack(msg);
if (batch.length >= BATCH_SIZE) {
clearTimeout(timeout);
await sendBatch();
} else if (!timeout) {
timeout = setTimeout(async () => {
await sendBatch();
timeout = null;
}, BATCH_TIMEOUT);
}
} catch (error) {
console.error('Error processing message:', error);
channel.nack(msg, false, false);
}
}
});
console.log(`Consuming from queue: ${QUEUE_NAME}`);
}
consume().catch(console.error);Python Consumer
import pika
import requests
import json
import os
from datetime import datetime
RABBITMQ_URL = os.getenv('RABBITMQ_URL', 'localhost')
QUEUE_NAME = os.getenv('QUEUE_NAME', 'logs')
PARSEABLE_URL = os.getenv('PARSEABLE_URL')
PARSEABLE_AUTH = os.getenv('PARSEABLE_AUTH')
def send_to_parseable(logs):
headers = {
'Authorization': f'Basic {PARSEABLE_AUTH}',
'X-P-Stream': 'rabbitmq-logs',
'Content-Type': 'application/json'
}
response = requests.post(
f'{PARSEABLE_URL}/api/v1/ingest',
json=logs,
headers=headers
)
response.raise_for_status()
def callback(ch, method, properties, body):
try:
log = json.loads(body)
log['timestamp'] = log.get('timestamp', datetime.utcnow().isoformat() + 'Z')
send_to_parseable([log])
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f'Error: {e}')
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback)
print(f'Consuming from queue: {QUEUE_NAME}')
channel.start_consuming()Docker Compose
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
consumer:
build: .
environment:
- RABBITMQ_URL=amqp://rabbitmq
- QUEUE_NAME=logs
- PARSEABLE_URL=http://parseable:8000
- PARSEABLE_AUTH=YWRtaW46YWRtaW4=
depends_on:
- rabbitmq
- parseableBest Practices
- Use Batching - Batch messages for efficiency
- Handle Failures - Implement retry logic
- Monitor Queues - Watch queue depth
- Use Dead Letter - Configure DLX for failed messages
- Acknowledge Properly - Ack after successful send
Troubleshooting
Messages Not Consumed
- Verify RabbitMQ connection
- Check queue exists
- Verify consumer is running
- Check RabbitMQ management UI
Delivery Failures
- Check Parseable endpoint
- Verify authentication
- Check network connectivity
Next Steps
- Configure Kafka for high-throughput
- Set up alerts for queue metrics
- Create dashboards for monitoring
Was this page helpful?