Kafka subprocess orchestration framework with pluggable output sinks. Consumes messages from Kafka, runs CPU-intensive external binaries in a managed subprocess pool, and delivers results to any combination of Kafka, PostgreSQL, MongoDB, Redis, HTTP, and filesystem.
Workers are the Drakkars, executors are the Vikings.
Kafka source topic
|
v
[ Drakkar Worker ]
|
+-- poll messages (per-partition pipelines)
+-- arrange() -> executor tasks (user hook)
+-- run external binary via subprocess pool
+-- on_task_complete() -> sink payloads (user hook)
+-- on_message_complete() -> aggregate per source message (user hook, optional)
+-- deliver to configured sinks (Kafka, Postgres, Mongo, Redis, HTTP, files)
+-- commit offsets (watermark-based, only after all sinks confirm)
|
v
Configured sinks (any combination)
- Per-partition independent pipelines with offset watermark tracking
- Pluggable sinks -- configure any combination of Kafka, PostgreSQL, MongoDB, Redis, HTTP, filesystem; third-party sinks register via entry points (see
docs/sinks.md#custom-sinks-plugin-api) - Dead letter queue -- failed deliveries go to a DLQ Kafka topic with error metadata
- Cooperative-sticky rebalancing -- non-revoked partitions continue without interruption
- Backpressure via Kafka pause/resume -- memory stays bounded regardless of consumer lag
- Subprocess executor pool with semaphore-based concurrency limiting
- Typed message models -- define Pydantic schemas for input/output, get auto-deserialization
- Cache (optional) --
self.cachekey/value store with memory + write-behind SQLite + eventually-consistent peer sync across workers; pluggable backends conform to the publicCacheLikeprotocol (docs) - Webapp pipeline (optional) -- opt-in synchronous HTTP endpoint exposing the same handler pipeline used for Kafka, with multi-tenant bearer-token auth, per-client rpm caps, opt-in sinks delivery, and graceful-shutdown semantics. Webapp users declare four type parameters on
BaseDrakkarHandler(Kafka in/out + HTTP request/response). Seedocs/webapp.md. - Built-in debug UI (FastAPI) with executor timeline, partition lag, message tracing
- Flight recorder -- SQLite event log with retention and rotation
- Prometheus metrics -- pipeline, executor, per-sink, and shutdown / drain metrics
- Structured JSON logging -- ECS-compatible, ready for Elastic
- Kubernetes-ready -- unauthenticated
/healthzand/readyzprobes plus reference manifests indeploy/k8s/; seedocs/deployment.md - Crash detection -- watchdog file distinguishes clean restarts from SIGKILL/OOM-kill on the next startup, with
drakkar_suspected_oom_kills_totaland a structured warning (docs/observability.md) - Perf extras --
pip install "py-drakkar[perf]"enables theorjsonfast path for recorder JSON encoding - DLQ replay --
scripts/replay_dlq.pyreads dead-lettered records and republishes them to a target topic (seedocs/sinks.md#dlq-replay)
uv init my-processor && cd my-processor
uv add py-drakkar# models.py
from pydantic import BaseModel
class InputMessage(BaseModel):
request_id: str
data: str
priority: int = 1
class ProcessedResult(BaseModel):
request_id: str
result: str
processed: bool
class ResultSummary(BaseModel):
request_id: str
status: str# handler.py
import structlog
from prometheus_client import Counter
from drakkar import (
BaseDrakkarHandler, CollectResult, DeliveryAction, DeliveryError,
ErrorAction, ExecutorTask, KafkaPayload, PostgresPayload,
RedisPayload, make_task_id,
)
from models import InputMessage, ProcessedResult, ResultSummary
logger = structlog.get_logger()
# custom Prometheus metric (user-defined)
items_processed = Counter('app_items_processed_total', 'Total processed items')
class MyHandler(BaseDrakkarHandler[InputMessage, ProcessedResult]):
async def arrange(self, messages, pending):
tasks = []
for msg in messages:
# msg.payload is an InputMessage instance (auto-deserialized)
tasks.append(ExecutorTask(
task_id=make_task_id("proc"),
args=["--input", msg.payload.data],
source_offsets=[msg.offset],
metadata={"request_id": msg.payload.request_id},
))
return tasks
async def on_task_complete(self, result):
output = ProcessedResult(
request_id=result.task.metadata["request_id"],
result=result.stdout.strip(),
processed=result.exit_code == 0,
)
summary = ResultSummary(
request_id=output.request_id,
status="done" if output.processed else "failed",
)
# custom Prometheus metric
items_processed.inc()
# async structured logging
await logger.ainfo(
"item_processed",
category="handler",
request_id=output.request_id,
processed=output.processed,
)
# route to sinks based on business logic
sinks = CollectResult(
kafka=[KafkaPayload(data=output, key=output.request_id.encode())],
postgres=[PostgresPayload(table="results", data=summary)],
)
# conditional: cache successful results in Redis
if output.processed:
sinks.redis.append(
RedisPayload(key=f"result:{output.request_id}", data=summary, ttl=3600)
)
return sinks
async def on_error(self, task, error):
await logger.awarning(
"task_failed", category="handler",
task_id=task.task_id, exit_code=error.exit_code,
)
return ErrorAction.RETRY
async def on_delivery_error(self, error: DeliveryError):
await logger.awarning(
"delivery_failed", category="handler",
sink=error.sink_name, error=error.error,
)
# retry transient failures, DLQ for permanent ones
if error.sink_type in ("http", "redis"):
return DeliveryAction.RETRY
return DeliveryAction.DLQ# drakkar.yaml
kafka:
brokers: "localhost:9092"
source_topic: "input-events"
consumer_group: "my-workers"
executor:
binary_path: "/usr/local/bin/my-processor"
max_executors: 8
task_timeout_seconds: 120
window_size: 20
sinks:
kafka:
results:
topic: "output-results"
postgres:
main:
dsn: "postgresql://user:pass@localhost:5432/mydb"
redis:
cache:
url: "redis://localhost:6379/0"
key_prefix: "app:"
dlq:
topic: "" # auto-derived: input-events_dlq
metrics:
port: 9090
debug:
port: 8080# main.py
from drakkar import DrakkarApp
from handler import MyHandler
app = DrakkarApp(
handler=MyHandler(),
config_path="drakkar.yaml",
)
app.run()Worker name is read from the WORKER_ID environment variable by default (configurable via worker_name_env in config).
Drakkar has an explicit trust model that operators should understand before production deployment. These assumptions are inherent to the framework's architecture and aren't weaknesses per se -- they're the trust boundaries.
- Handler binary is fully trusted.
executor.binary_pathis operator-configured; message bytes flow to the binary's stdin without sanitization. The binary runs with the worker's privileges (plus any env overrides fromExecutorConfig.envor per-taskenv). - Peer workers sharing
db_dirare fully trusted. The cache and recorder peer-sync mechanisms have no cryptographic authentication of peer writes. Anyone who can write to the shared directory can inject cache entries or event rows that your workers will read. Treatdb_diras a shared-trust boundary. - The debug UI is an operator tool, not a public surface. Authentication is opt-in by default —
debug.auth_tokenis empty out of the box, the UI runs unauthenticated, and a structured warning (debug_ui_unauthenticated) fires at startup naming the host:port and the two opt-in paths (debug.auth_tokenin YAML orDK_DEBUG__AUTH_TOKENenv var). To require auth, set the token to a 32+ character random value (e.g.python -c "import secrets; print(secrets.token_urlsafe(32))"); protected endpoints (database download, merge, message probe) and the WebSocket live-event stream then requireAuthorization: Bearer <token>(or?token=<token>). When a token is set, the WebSocket also validates theOriginheader (againstdebug.allowed_ws_originswhen configured, otherwise against the request'sHostheader). Even with auth, the debug UI exposes subprocess stdout/stderr, per-task env (after redaction), cache contents, and live event streams; restrict access to operators only.
Why is auth opt-out by default? The debug UI is read-only by design — no endpoint stops a running worker, replays Kafka messages, mutates configured sinks, or fakes pipeline data. The Message Probe runs the handler against pasted input but is enforced (with tests) to produce zero side effects: no sink writes, no offset commits, no recorder rows, no cache writes, no peer sync. Combined with Drakkar's expected deployment posture — inside a private contour (VPC, internal cluster network, operator-only ingress) — the framework treats "unauthenticated by default + structured startup warning" as the right balance between out-of-the-box ergonomics and operator visibility. Operators who deploy in any non-private context (public internet, multi-tenant host, hostile network segment) should treat the warning as a prompt to set auth_token before the worker accepts traffic.
4. Kafka producers are trusted for availability, not correctness. Drakkar deserializes message payloads via handler.deserialize_message; parse errors silently set msg.payload=None rather than DLQ-ing the message or raising. A malicious producer cannot execute code in the worker, but can cause handlers to see unexpected None payloads unless your handler validates.
5. Per-task env is redacted before it reaches the recorder. Handler-written values in task.env are sanitized on the way to the recorder DB: names matching *PASSWORD*/*SECRET*/*TOKEN*/*_KEY/*API_KEY*/*CREDENTIAL*/*_DSN become ***; other URL-shaped values have embedded credentials stripped. Non-matching names pass through, so rename or avoid per-task env for secrets whose names don't trigger a pattern. ExecutorConfig.env (framework-level) is never written to the recorder at all -- it only reaches the subprocess environment, making it the safer slot for stable credentials.
See the FAQ for deeper discussion of each assumption and links to the implementation.
| Hook | When | Purpose |
|---|---|---|
on_startup(config) |
Before components start | Modify config (e.g., auto-detect CPU count) |
on_ready(config, db_pool) |
After sinks connected | Initialize state from DB, run migrations |
arrange(messages, pending) |
Window of messages received | Transform messages into executor tasks |
on_task_complete(result) |
Each task completes | Process per-task result into sink payloads |
on_message_complete(group) |
All tasks for one source message finish | Aggregate per-message results (fan-out → fan-in) |
on_window_complete(results, messages) |
All tasks in a window done | Aggregate results across a window |
on_error(task, error) |
Task fails | Return RETRY, SKIP, or replacement tasks |
on_delivery_error(error) |
Sink delivery fails | Return DLQ (default), RETRY, or SKIP |
on_assign(partitions) |
Partitions assigned | Initialize per-partition state |
on_revoke(partitions) |
Partitions revoked | Cleanup per-partition state |
See docs/handler.md for hook semantics and docs/fan-out.md for the fan-out → fan-in pattern with on_message_complete.
Use the @periodic decorator to schedule recurring background coroutines on the handler. They run in the same async loop alongside the poll loop, start after on_ready(), and are cancelled on shutdown. Overlapping runs are prevented -- the next interval starts only after the current invocation finishes.
from drakkar import BaseDrakkarHandler, periodic
class MyHandler(BaseDrakkarHandler):
async def on_ready(self, config, db_pool):
self.db_pool = db_pool
@periodic(seconds=60)
async def refresh_cache(self):
async with self.db_pool.acquire() as conn:
self.cache = await conn.fetch("SELECT * FROM lookup")
@periodic(seconds=30, on_error="stop")
async def health_ping(self):
await http_post("https://health.example.com/ping")| Parameter | Type | Default | Description |
|---|---|---|---|
seconds |
float |
required | Interval between runs |
on_error |
"continue" | "stop" |
"continue" |
"continue" logs and retries next interval; "stop" logs and cancels the task |
Configure any combination in the sinks: section. Each type supports multiple named instances.
| Sink | Payload | Serialization |
|---|---|---|
KafkaPayload |
data: BaseModel, key: bytes |
data.model_dump_json().encode() -> value |
PostgresPayload |
data: BaseModel, table: str |
data.model_dump() -> column mapping |
MongoPayload |
data: BaseModel, collection: str |
data.model_dump() -> BSON document |
HttpPayload |
data: BaseModel |
data.model_dump_json() -> POST body |
RedisPayload |
data: BaseModel, key: str, ttl: int? |
data.model_dump_json() -> string value |
FilePayload |
data: BaseModel, path: str |
data.model_dump_json() + "\n" -> JSONL line |
Routing: if you have multiple sinks of the same type, set sink="name" on the payload. With a single sink per type, the framework routes automatically.
Error handling: on delivery failure, on_delivery_error() is called. Default action: write to DLQ. The DLQ topic is auto-derived as {source_topic}_dlq.
Define Pydantic models for your input/output and use them as type parameters:
class MyHandler(BaseDrakkarHandler[InputModel, OutputModel]):
async def arrange(self, messages, pending):
for msg in messages:
msg.payload # InputModel instance, auto-deserialized
msg.value # raw bytes, always available as fallbackNon-generic BaseDrakkarHandler (no type params) works too -- you get raw bytes in msg.value.
Webapp users declare four type parameters (InputT, OutputT, HttpRequestT, HttpResponseT) and override arrange_http_request / on_http_request_complete to expose the pipeline as a synchronous HTTP endpoint. See docs/webapp.md.
Run multiple instances with the same consumer_group. Kafka's cooperative-sticky rebalancing distributes partitions across workers.
WORKER_ID=worker-1 python main.py
WORKER_ID=worker-2 python main.pyAll config fields support environment variable override with DK_ prefix and __ for nesting:
DK_KAFKA__BROKERS=kafka:9092
DK_EXECUTOR__MAX_EXECUTORS=16
DK_DEBUG__PORT=8081Enabled by default at :8080. Pages:
/-- dashboard with partition tiles, pool utilization, event counters/partitions-- per-partition stats/live-- tabbed live view: Arrange (with filter, progress bars, and a right-side batch-detail sidebar), Executors (timeline), Collect/debug-- tabbed tools: Metrics, Periodic Tasks, Message Trace, Cache (when enabled), Databases, Message Probe. Deep-link#trace/<partition>/<offset>opens the Trace tab pre-filled./history-- filterable event browser with partition and event type toggles/task/{task_id}-- task detail with PID, duration, CLI command, stdout/stderr
Optional: set kafka.ui_url and kafka.ui_cluster_name in config to render a small Kafka-UI icon next to every <partition:offset> link; clicking the icon opens the corresponding message in Kafka-UI (provectus).
The Message Probe tab on /debug lets you paste a raw message value and run it end-to-end through the live handler's full pipeline -- arrange -> subprocess executor -> on_task_complete -> on_message_complete -> on_window_complete -- without touching any production state. The report shows the parsed SourceMessage, every generated task with stdin/stdout/stderr/exit code/duration, each hook's returned CollectResult, the sink payloads that would have been produced (grouped by sink type), every cache call made during the run, a timeline waterfall, and any exceptions with full tracebacks. Click any task row to open a right-side sidebar with the full scrollable stdin/stdout/stderr (handles 15k+ lines).
Safety guarantees (enforced by tests): no sink writes, no offset commits, no event-recorder rows, no cache writes, no peer sync -- zero footprint on the live system, even when production Kafka traffic is processed concurrently with the probe. The cache swap is gated by an asyncio contextvar so concurrent production tasks (poll loop, partition processors, @periodic background tasks) keep hitting the real cache directly while the probe sees its own isolated suppression-and-logging layer. Cache reads are opt-in for the probe via the Use cache (read-only) checkbox; when enabled, the probe forwards reads to the live cache but still silently suppresses writes within the probe's own pipeline. The handler.cache swap is always restored in a finally block, even if a hook raises.
Paste your message, click Run, and see the full behavior instead of inferring it from flight-recorder rows.
Exposed at :9090/metrics. Key metrics:
drakkar_messages_consumed_total{partition}drakkar_executor_tasks_total{status},drakkar_executor_duration_secondsdrakkar_sink_payloads_delivered_total{sink_type, sink_name}drakkar_sink_deliver_errors_total{sink_type, sink_name}drakkar_sink_deliver_duration_seconds{sink_type, sink_name}drakkar_sink_dlq_messages_totaldrakkar_backpressure_active,drakkar_total_queueddrakkar_offset_lag{partition},drakkar_assigned_partitionsdrakkar_handler_duration_seconds{hook}drakkar_worker_info(worker_id, version, consumer_group)drakkar_uncommitted_offsets_at_stop,drakkar_inflight_at_stop-- shutdown snapshotsdrakkar_drain_timeout_hit_total-- drain exceededexecutor.drain_timeout_secondsdrakkar_suspected_oom_kills_total-- watchdog detected the previous run did not exit cleanly
JSON to stderr, ECS-compatible. Every log line includes service_name, worker_id, consumer_group, category, and timestamp.
Use structlog.get_logger() for async logging in your handlers:
import structlog
logger = structlog.get_logger()
# in any async hook
await logger.ainfo("my_event", category="handler", custom_field="value")A full docker-compose example lives in integration/ with all 6 sink types:
cd integration
docker compose up --buildServices and web UIs:
| URL | Service |
|---|---|
http://localhost:8081 |
Worker 1 debug UI (primary workers, shared consumer group) |
http://localhost:8082 |
Worker 2 debug UI |
http://localhost:8083 |
Worker 3 debug UI |
http://localhost:8084 |
Fast-worker 1 debug UI (separate consumer group, on_window_complete aggregation) |
http://localhost:8085 |
Fast-worker 2 debug UI |
http://localhost:8087 |
Redis Commander |
http://localhost:8088 |
Kafka UI |
http://localhost:8089 |
MongoDB Express |
http://localhost:9099 |
Prometheus |
The integration scenario:
- 3 primary workers consuming from a 50-partition topic (main pipeline)
- 2 fast-workers on a separate consumer group demonstrating
on_window_completewindow aggregation - Each result goes to Kafka + Postgres + MongoDB + Redis (always)
- Two Postgres sink instances (
postgres.main+postgres.hot) showing multiple-sinks-of-same-type routing - Framework cache (
self.cache) is enabled withscope=CLUSTER— peer sync propagates entries between primary workers (see the Cache tab in/debug) - High-match results (>20) trigger HTTP webhook
- Very high-match results (>50) write to JSONL file
- 5% simulated executor failures with retry via
on_error() - Failed deliveries route to DLQ or retry based on sink type
Debug recorder databases and per-worker cache databases both live in integration/shared/, which is mounted into every worker container as /shared. Recorder files are per-worker timestamped (worker-1-2026-03-23__14_55_00.db with a {worker}-live.db symlink). Cache files are single per-worker (worker-1-cache.db.actual with a {worker}-cache.db symlink used for peer discovery). See integration/shared/README.md for details.
uv sync --extra=dev
uv run pytest --cov=drakkar
uvx ruff check drakkar/ tests/
uv run ty check drakkar/MIT