English | 简体中文
A lightweight service for collecting and persisting time-series data. It consumes BSON payloads from NATS JetStream work-queue streams and batch writes them to MongoDB.
- Push-based message consumption from NATS JetStream
- Batch writes to MongoDB with configurable buffer
- Dynamic stream management via JetStream KeyValue
- Auto-subscribe on KV PUT, auto-unsubscribe on KV DELETE
- Graceful shutdown with final buffer flush
- Cloud-native design: multiple streams per collector, scale horizontally
- NATS JetStream cluster
- MongoDB 5.0+ with Time Series Collections
Recommendation: Use MongoDB Time Series Collections for optimal storage efficiency and query performance on time-series data. Create your collections with
timeseriesoption before sending data.
Create config/values.yml:
mode: debug
namespace: alpha
batch_size: 1000
flush_interval: 5s
nats_hosts:
- nats://127.0.0.1:4222
nats_token: your-token
mongo_url: mongodb://localhost:27017
mongo_database: example| Field | Description |
|---|---|
mode |
debug or release |
namespace |
Application namespace for stream/KV naming |
batch_size |
Flush buffer when reaching this count |
flush_interval |
Flush buffer at this interval |
nats_hosts |
NATS server addresses |
nats_token |
NATS authentication token |
mongo_url |
MongoDB connection URL |
mongo_database |
MongoDB database name |
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ App Service │ │ App Service │ │ App Service │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└───────────────────┼───────────────────┘
│ Send BSON
▼
┌─────────────────┐
│ Transfer SDK │
│ Publish BSON │
└────────┬────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ NATS JetStream │
│ Stream: {namespace}_{key} │
│ Subject: {namespace}.{key} │
│ Consumer: default (WorkQueue) │
│ KV Bucket: {namespace} │
└───────────────────────────┬─────────────────────────────────┘
│ Consume()
▼
┌─────────────────────────────────────────────────────────────┐
│ Collector Pod │
│ │
│ Message ──► Buffer ──► Flush ──► InsertMany │
│ │ │
│ (batch_size OR flush_interval) │
└───────────────────────────┬─────────────────────────────────┘
│ Success: ACK / Fail: NAK
▼
┌─────────────────────────┐
│ MongoDB │
└─────────────────────────┘
Client SDK for managing streams and publishing BSON payloads.
import (
"context"
"time"
"github.com/kainonly/collector/v3/transfer"
"github.com/nats-io/nats.go"
"go.mongodb.org/mongo-driver/v2/bson"
)
func main() {
ctx := context.Background()
nc, _ := nats.Connect("nats://127.0.0.1:4222", nats.Token("your-token"))
// Create client
t, _ := transfer.New(ctx, "alpha", nc)
// Register stream
t.Add(ctx, transfer.Option{
Key: "metrics",
Collection: "metrics",
Description: "Metrics stream",
})
// Publish BSON data
t.Send("metrics", bson.M{
"ts": time.Now(),
"cpu": 0.42,
})
// Query collector state
option, _ := t.Get(ctx, "metrics")
fmt.Println(option.BufferSize)
// Remove stream
t.Remove(ctx, "metrics")
}cp config/values.example.yml config/values.yml
go run .Container image: ghcr.io/kainonly/collector:latest
Kubernetes deployment example:
apiVersion: apps/v1
kind: Deployment
metadata:
name: collector
spec:
selector:
matchLabels:
app: collector
template:
metadata:
labels:
app: collector
spec:
containers:
- name: collector
image: ghcr.io/kainonly/collector:latest
imagePullPolicy: Always
volumeMounts:
- name: config
mountPath: /app/config
volumes:
- name: config
configMap:
name: collector-configmain.go- Entry pointbootstrap/- Configuration and dependency setupapp/- Collector runtime (KV watch, buffer, MongoDB writes)transfer/- Producer SDK (stream/KV management + publish)common/- Shared types and loggerconfig/- Configuration examples
