NATS
Stream logs from NATS to Parseable
Stream logs from NATS messaging system to Parseable.
Overview
Integrate NATS with Parseable to:
- Cloud Native Messaging - Lightweight, high-performance messaging
- Pub/Sub Logs - Subscribe to log topics
- JetStream - Persistent message streaming
- Low Latency - Sub-millisecond message delivery
Prerequisites
- NATS server running
- Parseable instance accessible
- NATS client library
Method 1: Custom Subscriber
Build a subscriber to forward messages to Parseable.
Node.js Subscriber
const { connect, StringCodec } = require('nats');
const axios = require('axios');
const NATS_URL = process.env.NATS_URL || 'localhost:4222';
const SUBJECT = process.env.NATS_SUBJECT || 'logs.>';
const PARSEABLE_URL = process.env.PARSEABLE_URL;
const PARSEABLE_AUTH = process.env.PARSEABLE_AUTH;
const sc = StringCodec();
async function subscribe() {
const nc = await connect({ servers: NATS_URL });
console.log(`Connected to NATS at ${NATS_URL}`);
const batch = [];
const BATCH_SIZE = 100;
const BATCH_TIMEOUT = 5000;
let timeout = null;
const sendBatch = async () => {
if (batch.length === 0) return;
const logs = batch.splice(0, batch.length);
try {
await axios.post(`${PARSEABLE_URL}/api/v1/ingest`, logs, {
headers: {
'Authorization': `Basic ${PARSEABLE_AUTH}`,
'X-P-Stream': 'nats-logs',
'Content-Type': 'application/json'
}
});
console.log(`Sent ${logs.length} logs to Parseable`);
} catch (error) {
console.error('Error sending to Parseable:', error.message);
}
};
const sub = nc.subscribe(SUBJECT);
console.log(`Subscribed to: ${SUBJECT}`);
for await (const msg of sub) {
try {
const data = sc.decode(msg.data);
const log = JSON.parse(data);
log.timestamp = log.timestamp || new Date().toISOString();
log.subject = msg.subject;
batch.push(log);
if (batch.length >= BATCH_SIZE) {
clearTimeout(timeout);
await sendBatch();
} else if (!timeout) {
timeout = setTimeout(async () => {
await sendBatch();
timeout = null;
}, BATCH_TIMEOUT);
}
} catch (error) {
console.error('Error processing message:', error);
}
}
}
subscribe().catch(console.error);Go Subscriber
package main
import (
"bytes"
"encoding/json"
"log"
"net/http"
"os"
"time"
"github.com/nats-io/nats.go"
)
type LogEntry struct {
Timestamp string `json:"timestamp"`
Subject string `json:"subject"`
Data map[string]interface{} `json:"data,inline"`
}
func main() {
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = "nats://localhost:4222"
}
parseableURL := os.Getenv("PARSEABLE_URL")
parseableAuth := os.Getenv("PARSEABLE_AUTH")
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
log.Printf("Connected to NATS at %s", natsURL)
_, err = nc.Subscribe("logs.>", func(msg *nats.Msg) {
var data map[string]interface{}
if err := json.Unmarshal(msg.Data, &data); err != nil {
data = map[string]interface{}{"message": string(msg.Data)}
}
entry := LogEntry{
Timestamp: time.Now().UTC().Format(time.RFC3339),
Subject: msg.Subject,
Data: data,
}
payload, _ := json.Marshal([]LogEntry{entry})
req, _ := http.NewRequest("POST", parseableURL+"/api/v1/ingest", bytes.NewBuffer(payload))
req.Header.Set("Authorization", "Basic "+parseableAuth)
req.Header.Set("X-P-Stream", "nats-logs")
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
log.Printf("Error sending to Parseable: %v", err)
return
}
defer resp.Body.Close()
})
if err != nil {
log.Fatal(err)
}
log.Println("Subscribed to logs.>")
select {}
}Method 2: JetStream Consumer
For persistent message streaming with JetStream:
const { connect, StringCodec, AckPolicy, DeliverPolicy } = require('nats');
const axios = require('axios');
async function consumeJetStream() {
const nc = await connect({ servers: process.env.NATS_URL });
const js = nc.jetstream();
const sc = StringCodec();
// Create or get consumer
const consumer = await js.consumers.get('LOGS', 'parseable-consumer');
const messages = await consumer.consume();
for await (const msg of messages) {
try {
const log = JSON.parse(sc.decode(msg.data));
log.timestamp = log.timestamp || new Date().toISOString();
await axios.post(`${process.env.PARSEABLE_URL}/api/v1/ingest`, [log], {
headers: {
'Authorization': `Basic ${process.env.PARSEABLE_AUTH}`,
'X-P-Stream': 'nats-logs',
'Content-Type': 'application/json'
}
});
msg.ack();
} catch (error) {
console.error('Error:', error);
msg.nak();
}
}
}
consumeJetStream().catch(console.error);Docker Compose
version: '3.8'
services:
nats:
image: nats:latest
ports:
- "4222:4222"
- "8222:8222"
command: ["--jetstream"]
subscriber:
build: .
environment:
- NATS_URL=nats://nats:4222
- NATS_SUBJECT=logs.>
- PARSEABLE_URL=http://parseable:8000
- PARSEABLE_AUTH=YWRtaW46YWRtaW4=
depends_on:
- nats
- parseableConfiguration Options
| Parameter | Description |
|---|---|
NATS_URL | NATS server URL |
NATS_SUBJECT | Subject pattern to subscribe |
PARSEABLE_URL | Parseable endpoint |
PARSEABLE_AUTH | Base64 encoded credentials |
Best Practices
- Use JetStream - For persistent, reliable delivery
- Batch Messages - Reduce HTTP overhead
- Handle Backpressure - Implement flow control
- Monitor Consumers - Track consumer lag
- Use Wildcards - Subscribe to subject hierarchies
Troubleshooting
Connection Issues
- Verify NATS server is running
- Check network connectivity
- Verify authentication if enabled
Message Loss
- Use JetStream for persistence
- Implement acknowledgments
- Configure appropriate retention
Next Steps
- Configure Kafka for high-throughput
- Set up alerts for streaming metrics
- Create dashboards for monitoring
Was this page helpful?