A high-performance, type-safe CDC/Data Replication framework for Rust, focused on real-time data pipelines.
Rigatoni is a modern CDC (Change Data Capture) and data replication framework built for speed, reliability, and developer experience. Built with Rust's type system and async/await, it provides production-ready data pipelines for real-time streaming workloads from databases to data lakes and other destinations.
Currently supporting:
- MongoDB Change Streams - Real-time CDC (Change Data Capture) from MongoDB
- S3 Destination - Export to AWS S3 with multiple formats (JSON, CSV, Parquet, Avro)
- Redis State Store - Distributed state management for multi-instance deployments
- Pipeline Orchestration - Multi-worker architecture with retry logic and state management
- Metrics & Observability - Prometheus metrics with Grafana dashboards
- Async-first design - Powered by Tokio for high throughput
- Type-safe transformations - Compile-time guarantees with Rust's type system
- Modular architecture - Extensible with feature flags
Rigatoni delivers exceptional performance for high-throughput CDC workloads:
- ~780ns per event - Core processing with linear scaling
- ~1.2ΞΌs per event - JSON serialization
- 7.65ms for 1000 events - S3 writes with ZSTD compression
- 10K-100K events/sec - Production-ready throughput
- Sub-millisecond - State store operations
See our detailed benchmarks for comprehensive performance analysis.
- π High Performance: Async/await architecture with Tokio for concurrent processing
- π Type Safety: Leverage Rust's type system for data transformation guarantees
- π MongoDB CDC: Real-time change stream listening with resume token support
- π¦ S3 Integration: Multiple formats (JSON, CSV, Parquet, Avro) with compression (gzip, zstd)
- ποΈ Distributed State: Redis-backed state store for multi-instance deployments
- π Distributed Locking: Redis-based locking for horizontal scaling without duplicates
- π Retry Logic: Exponential backoff with configurable limits
- π― Batching: Automatic batching based on size and time windows
- π¨ Composable Pipelines: Build data replication workflows from simple, testable components
- π Metrics: Prometheus metrics for throughput, latency, errors, and health
- π Observability: Comprehensive tracing, metrics, and Grafana dashboards
- π§ͺ Testable: Mock destinations and comprehensive test utilities
Rigatoni is organized as a workspace with three main crates:
rigatoni/
βββ rigatoni-core/ # Core traits and pipeline orchestration
βββ rigatoni-destinations/ # Destination implementations
βββ rigatoni-stores/ # State store implementations
- Source: Extract data from systems (MongoDB change streams)
- Destination: Load data into target systems (S3 with multiple formats)
- Store: Manage pipeline state for reliability (in-memory, Redis)
- Pipeline: Orchestrate the entire data replication workflow with error handling
- Rust 1.88 or later
- AWS credentials configured for S3 access
Add Rigatoni to your Cargo.toml:
[dependencies]
rigatoni-core = "0.2.0"
rigatoni-destinations = { version = "0.2.0", features = ["s3"] }
rigatoni-stores = { version = "0.2.0", features = ["memory"] }use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
use rigatoni_destinations::s3::{S3Config, S3Destination};
use rigatoni_stores::memory::MemoryStore;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure state store (in-memory for simplicity)
let store = MemoryStore::new();
// Configure S3 destination
let s3_config = S3Config::builder()
.bucket("my-data-lake")
.region("us-east-1")
.prefix("mongodb-cdc")
.build()?;
let destination = S3Destination::new(s3_config).await?;
// Configure pipeline - watch entire database
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.watch_database() // Watch all collections in the database
.batch_size(1000)
.build()?;
// Create and run pipeline
let mut pipeline = Pipeline::new(config, store, destination).await?;
pipeline.start().await?;
Ok(())
}Rigatoni supports three levels of change stream watching:
// Watch specific collections only
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.watch_collections(vec!["users".to_string(), "orders".to_string()])
.build()?;
// Watch all collections in a database (recommended)
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.watch_database() // Automatically picks up new collections
.build()?;
// Watch all databases in the deployment (cluster-wide)
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.watch_deployment() // Requires MongoDB 4.0+ and cluster-wide permissions
.build()?;For multi-instance deployments, use Redis to share state across pipeline instances:
use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
use rigatoni_destinations::s3::{S3Config, S3Destination};
use rigatoni_stores::redis::{RedisStore, RedisConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure Redis state store
let redis_config = RedisConfig::builder()
.url("redis://localhost:6379")
.pool_size(10)
.ttl(Duration::from_secs(7 * 24 * 60 * 60)) // 7 days
.build()?;
let store = RedisStore::new(redis_config).await?;
// Configure S3 destination
let s3_config = S3Config::builder()
.bucket("my-data-lake")
.region("us-east-1")
.build()?;
let destination = S3Destination::new(s3_config).await?;
// Configure pipeline with Redis store
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.collections(vec!["users", "orders"])
.build()?;
// Create and run pipeline with distributed state
let mut pipeline = Pipeline::new(config, store, destination).await?;
pipeline.start().await?;
Ok(())
}Rigatoni supports horizontal scaling with distributed locking to prevent duplicate event processing:
use rigatoni_core::pipeline::{Pipeline, PipelineConfig, DistributedLockConfig};
use rigatoni_stores::redis::{RedisStore, RedisConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure Redis store (required for distributed locking)
let redis_config = RedisConfig::builder()
.url("redis://localhost:6379")
.pool_size(10)
.build()?;
let store = RedisStore::new(redis_config).await?;
// Configure pipeline with distributed locking
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.watch_collections(vec!["users".to_string(), "orders".to_string()])
.distributed_lock(DistributedLockConfig {
enabled: true,
ttl: Duration::from_secs(30), // Lock expires if holder crashes
refresh_interval: Duration::from_secs(10), // Heartbeat interval
retry_interval: Duration::from_secs(5), // Retry claiming locks
})
.build()?;
// Create and run pipeline
let mut pipeline = Pipeline::new(config, store, destination).await?;
pipeline.start().await?;
Ok(())
}How it works:
- Each collection is protected by a distributed lock (stored in Redis)
- Only one instance processes a collection at a time
- If an instance crashes, its locks expire after TTL (default 30s)
- Other instances automatically take over orphaned collections
- Total throughput scales linearly with number of instances
Instance 1 Instance 2 Instance 3
| | |
v v v
Acquires locks Acquires locks Acquires locks
"users" "orders" "products"
| | |
v v v
Process events Process events Process events
(no duplicates!) (no duplicates!) (no duplicates!)
See Multi-Instance Deployment Guide for Kubernetes examples, configuration tuning, and failure handling.
See Getting Started for detailed tutorials and Redis Configuration Guide for production deployment.
Rigatoni includes comprehensive metrics for production observability:
use metrics_exporter_prometheus::PrometheusBuilder;
use rigatoni_core::metrics;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize metrics
metrics::init_metrics();
// Start Prometheus exporter
let addr = ([0, 0, 0, 0], 9000).into();
PrometheusBuilder::new()
.with_http_listener(addr)
.install()?;
// Metrics now available at http://localhost:9000/metrics
// ... configure and run pipeline ...
Ok(())
}Available Metrics:
- Counters: events processed, events failed, retries, batches written
- Histograms: batch size, batch duration, write latency, write bytes
- Gauges: active collections, pipeline status, queue size
See Observability Guide for Prometheus setup, Grafana dashboards, and alerting.
- Getting Started Guide - Installation, setup, and your first pipeline
- Examples - Runnable examples with complete setup instructions
- Local Development - Complete local environment with Docker Compose
- Architecture Guide - System design and core concepts
- Observability Guide - Metrics, monitoring, and Grafana dashboards
- User Guides - S3 configuration, Redis setup, production deployment
- API Reference - Complete API documentation
- Contributing Guide - How to contribute to Rigatoni
- CI/CD Guide - Development workflow and automation
# Build all workspace members
cargo build --workspace
# Build with all features
cargo build --workspace --all-features
# Run tests
cargo test --workspace --all-featuresWe provide a pre-push script to run all CI checks locally:
# Linux/macOS
./scripts/pre-push.sh
# Windows PowerShell
.\scripts\pre-push.ps1This runs:
- All tests (default features, all features, no default features)
- Clippy linting with strict rules
- Rustfmt formatting checks
- Documentation builds
Rigatoni maintains high code quality standards:
- β Automated CI: All code must pass comprehensive checks
- β
Security Scanning: Vulnerability detection with
cargo-audit - β
License Compliance: Enforced with
cargo-deny - β Strict Linting: Clippy pedantic mode
- Automated Security Audits: Every commit is scanned for known vulnerabilities
- Dependency Review: All dependencies are vetted for license compliance
To report security vulnerabilities, please email: valeriouberti@icloud.com
We welcome contributions! Please see CONTRIBUTING.md for details.
Quick checklist:
- Fork the repository
- Create a feature branch
- Write tests for your changes
- Run
./scripts/pre-push.shto validate - Submit a PR
Rigatoni is licensed under the Apache License 2.0.
- Author: Valerio Uberti
- Email: valeriouberti@icloud.com
- Repository: github.com/valeriouberti/rigatoni
