Parseable

NATS

Stream logs from NATS to Parseable


Stream logs from NATS messaging system to Parseable.

Overview

Integrate NATS with Parseable to:

  • Cloud Native Messaging - Lightweight, high-performance messaging
  • Pub/Sub Logs - Subscribe to log topics
  • JetStream - Persistent message streaming
  • Low Latency - Sub-millisecond message delivery

Prerequisites

  • NATS server running
  • Parseable instance accessible
  • NATS client library

Method 1: Custom Subscriber

Build a subscriber to forward messages to Parseable.

Node.js Subscriber

const { connect, StringCodec } = require('nats');
const axios = require('axios');

const NATS_URL = process.env.NATS_URL || 'localhost:4222';
const SUBJECT = process.env.NATS_SUBJECT || 'logs.>';
const PARSEABLE_URL = process.env.PARSEABLE_URL;
const PARSEABLE_AUTH = process.env.PARSEABLE_AUTH;

const sc = StringCodec();

async function subscribe() {
  const nc = await connect({ servers: NATS_URL });
  console.log(`Connected to NATS at ${NATS_URL}`);
  
  const batch = [];
  const BATCH_SIZE = 100;
  const BATCH_TIMEOUT = 5000;
  
  let timeout = null;
  
  const sendBatch = async () => {
    if (batch.length === 0) return;
    
    const logs = batch.splice(0, batch.length);
    
    try {
      await axios.post(`${PARSEABLE_URL}/api/v1/ingest`, logs, {
        headers: {
          'Authorization': `Basic ${PARSEABLE_AUTH}`,
          'X-P-Stream': 'nats-logs',
          'Content-Type': 'application/json'
        }
      });
      console.log(`Sent ${logs.length} logs to Parseable`);
    } catch (error) {
      console.error('Error sending to Parseable:', error.message);
    }
  };
  
  const sub = nc.subscribe(SUBJECT);
  console.log(`Subscribed to: ${SUBJECT}`);
  
  for await (const msg of sub) {
    try {
      const data = sc.decode(msg.data);
      const log = JSON.parse(data);
      log.timestamp = log.timestamp || new Date().toISOString();
      log.subject = msg.subject;
      batch.push(log);
      
      if (batch.length >= BATCH_SIZE) {
        clearTimeout(timeout);
        await sendBatch();
      } else if (!timeout) {
        timeout = setTimeout(async () => {
          await sendBatch();
          timeout = null;
        }, BATCH_TIMEOUT);
      }
    } catch (error) {
      console.error('Error processing message:', error);
    }
  }
}

subscribe().catch(console.error);

Go Subscriber

package main

import (
    "bytes"
    "encoding/json"
    "log"
    "net/http"
    "os"
    "time"

    "github.com/nats-io/nats.go"
)

type LogEntry struct {
    Timestamp string                 `json:"timestamp"`
    Subject   string                 `json:"subject"`
    Data      map[string]interface{} `json:"data,inline"`
}

func main() {
    natsURL := os.Getenv("NATS_URL")
    if natsURL == "" {
        natsURL = "nats://localhost:4222"
    }
    
    parseableURL := os.Getenv("PARSEABLE_URL")
    parseableAuth := os.Getenv("PARSEABLE_AUTH")
    
    nc, err := nats.Connect(natsURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()
    
    log.Printf("Connected to NATS at %s", natsURL)
    
    _, err = nc.Subscribe("logs.>", func(msg *nats.Msg) {
        var data map[string]interface{}
        if err := json.Unmarshal(msg.Data, &data); err != nil {
            data = map[string]interface{}{"message": string(msg.Data)}
        }
        
        entry := LogEntry{
            Timestamp: time.Now().UTC().Format(time.RFC3339),
            Subject:   msg.Subject,
            Data:      data,
        }
        
        payload, _ := json.Marshal([]LogEntry{entry})
        
        req, _ := http.NewRequest("POST", parseableURL+"/api/v1/ingest", bytes.NewBuffer(payload))
        req.Header.Set("Authorization", "Basic "+parseableAuth)
        req.Header.Set("X-P-Stream", "nats-logs")
        req.Header.Set("Content-Type", "application/json")
        
        client := &http.Client{Timeout: 10 * time.Second}
        resp, err := client.Do(req)
        if err != nil {
            log.Printf("Error sending to Parseable: %v", err)
            return
        }
        defer resp.Body.Close()
    })
    
    if err != nil {
        log.Fatal(err)
    }
    
    log.Println("Subscribed to logs.>")
    select {}
}

Method 2: JetStream Consumer

For persistent message streaming with JetStream:

const { connect, StringCodec, AckPolicy, DeliverPolicy } = require('nats');
const axios = require('axios');

async function consumeJetStream() {
  const nc = await connect({ servers: process.env.NATS_URL });
  const js = nc.jetstream();
  const sc = StringCodec();
  
  // Create or get consumer
  const consumer = await js.consumers.get('LOGS', 'parseable-consumer');
  
  const messages = await consumer.consume();
  
  for await (const msg of messages) {
    try {
      const log = JSON.parse(sc.decode(msg.data));
      log.timestamp = log.timestamp || new Date().toISOString();
      
      await axios.post(`${process.env.PARSEABLE_URL}/api/v1/ingest`, [log], {
        headers: {
          'Authorization': `Basic ${process.env.PARSEABLE_AUTH}`,
          'X-P-Stream': 'nats-logs',
          'Content-Type': 'application/json'
        }
      });
      
      msg.ack();
    } catch (error) {
      console.error('Error:', error);
      msg.nak();
    }
  }
}

consumeJetStream().catch(console.error);

Docker Compose

version: '3.8'
services:
  nats:
    image: nats:latest
    ports:
      - "4222:4222"
      - "8222:8222"
    command: ["--jetstream"]
  
  subscriber:
    build: .
    environment:
      - NATS_URL=nats://nats:4222
      - NATS_SUBJECT=logs.>
      - PARSEABLE_URL=http://parseable:8000
      - PARSEABLE_AUTH=YWRtaW46YWRtaW4=
    depends_on:
      - nats
      - parseable

Configuration Options

ParameterDescription
NATS_URLNATS server URL
NATS_SUBJECTSubject pattern to subscribe
PARSEABLE_URLParseable endpoint
PARSEABLE_AUTHBase64 encoded credentials

Best Practices

  1. Use JetStream - For persistent, reliable delivery
  2. Batch Messages - Reduce HTTP overhead
  3. Handle Backpressure - Implement flow control
  4. Monitor Consumers - Track consumer lag
  5. Use Wildcards - Subscribe to subject hierarchies

Troubleshooting

Connection Issues

  1. Verify NATS server is running
  2. Check network connectivity
  3. Verify authentication if enabled

Message Loss

  1. Use JetStream for persistence
  2. Implement acknowledgments
  3. Configure appropriate retention

Next Steps

  • Configure Kafka for high-throughput
  • Set up alerts for streaming metrics
  • Create dashboards for monitoring

Was this page helpful?

On this page