Skip to content

hellogravel/dlt

Repository files navigation

dlt Cloudflare Data Platform

This repo is the home for our dlt-* data ingestion platform: Cloudflare-hosted orchestration and Python dlt runners that move operational data into BigQuery.

The first source is Cloudflare Workers KV. The intended shape is broader: up to a dozen KV namespaces, internal APIs, SaaS APIs, webhook feeds, and other batch/incremental sources managed through one Cloudflare control plane.

What This Is

This is not just a script. It is the beginning of a small data platform:

  • Cloudflare runs the scheduler, job queue, operational ledger, raw staging, and containerized execution.
  • dlt runs extraction, schema inference/evolution, normalization, and loading.
  • BigQuery remains the analytical warehouse.
  • R2 stores optional raw snapshots for replay and audit.
  • D1 will store source/job/run metadata so humans and agents can inspect and manage jobs.
flowchart LR
  U["Operators / agents / webhooks"] --> W["dlt-orchestrator Worker"]
  CRON["Cron Triggers"] --> W
  W --> D1["D1: dlt_control"]
  W --> Q["Queue: dlt-jobs-v2"]
  Q --> C["Container: dlt-runner"]
  C --> SRC["KV / APIs / SaaS sources"]
  C --> R2["R2: dlt-raw-staging"]
  C --> BQ["BigQuery"]
  C --> D1
  Q --> DLQ["Queue: dlt-jobs-v2-dlq"]
Loading

Current Deployed Instance

Production is deployed on Cloudflare:

  • Worker/API/admin: https://dlt-orchestrator.hgdc.workers.dev
  • Container app: dlt-orchestrator-dltrunner
  • Queue: dlt-jobs-v2
  • Dead letter queue: dlt-jobs-v2-dlq
  • D1 database: dlt_control
  • BigQuery dataset: hello-gravel-data.source_cloudflare
  • Schedule: hourly at minute 0 UTC
  • Runner size: standard-1 Cloudflare Container

Implemented:

  • Python package scaffold in src/dlt_cloudflare_kv/
  • local CLI command: dlt-kv
  • Cloudflare KV namespace listing
  • KV key/value extraction
  • JSON parsing and warehouse row shaping
  • BigQuery destination wiring through dlt
  • D1-backed source definitions and run ledger
  • Worker operator API and minimal /admin page
  • Queue-backed manual and scheduled runs
  • Containerized Python dlt runner
  • Batched Cloudflare KV backfills using explicit key lists
  • Incremental scheduled leads_kv loads using recent key-prefix windows
  • Disabled vendor_kv source definition ready for first vendor snapshot

Local Setup

python -m venv .venv
source .venv/bin/activate
pip install -e .
cp .env.example .env

Fill in .env:

  • CLOUDFLARE_ACCOUNT_ID
  • CLOUDFLARE_API_TOKEN
  • BigQuery credentials through GOOGLE_APPLICATION_CREDENTIALS or dlt's DESTINATION__BIGQUERY__... variables

Source definitions live outside .env:

cp config/sources.example.json config/sources.json

Add one entry per KV namespace or source job. .env is for secrets and account-level settings; config/sources.json is for non-secret pipeline configuration like namespace IDs, prefixes, datasets, tables, and write dispositions.

First KV Pipeline

List available KV namespaces:

dlt-kv namespaces

Sample records before loading:

dlt-kv sources
dlt-kv keys --source-id cloudflare_kv_orders --limit 20
dlt-kv profile --source-id cloudflare_kv_orders --limit 100
dlt-kv profile --source-id cloudflare_kv_leads --limit 100
dlt-kv profile --source-id cloudflare_kv_vendors --limit 100
dlt-kv sample --source-id cloudflare_kv_orders --limit 20

Load records to BigQuery:

dlt-kv load --source-id cloudflare_kv_orders
dlt-kv load --source-id cloudflare_kv_leads --limit 500

The current loader preserves:

  • cf_kv_key
  • cf_kv_namespace_id
  • cf_kv_namespace_title
  • cf_kv_metadata
  • cf_kv_expiration
  • cf_kv_fetched_at
  • cf_kv_value_is_json
  • cf_kv_value_raw

If the KV value is a JSON object, its fields are expanded into BigQuery columns and the cf_kv_* audit fields are protected.

Connecting To The DLT Instance

Humans and agents should interact with the platform through the dlt-orchestrator Worker rather than shelling into the runner.

Set these in .env:

DLT_ORCHESTRATOR_URL=https://dlt-orchestrator.hgdc.workers.dev
DLT_OPERATOR_TOKEN=...

1. Admin Page

Open:

https://dlt-orchestrator.hgdc.workers.dev/admin?token=<DLT_OPERATOR_TOKEN>

The token is converted into an HttpOnly cookie, so refreshes and form-based runs work without keeping the token in the URL.

2. Operator API

Implemented endpoints:

GET  /health
GET  /sources
POST /sources
GET  /sources/:source_id
PUT  /sources/:source_id
POST /sources/:source_id/runs
GET  /runs
GET  /runs/:run_id

Create and edit source definitions through POST /sources and PUT /sources/:source_id. Triggering a run returns a queued run_id; poll GET /runs/:run_id for completion.

Status-only local check:

.venv/bin/python scripts/check_deployment_status.py

Live limited-run verification:

.venv/bin/python scripts/verify_deployment.py

3. D1 Control Tables

The Worker stores job definitions and run history in D1. Operators can inspect D1 directly for debugging, but writes should go through the Worker API so validation and audit logging stay consistent.

Current tables:

  • sources
  • source_versions
  • runs
  • run_events

4. Queue Inspection

dlt-jobs-v2 holds pending work. dlt-jobs-v2-dlq holds jobs that exhausted retries. Operators should use the Worker API for normal retries; direct queue inspection is for incident response. The original dlt-jobs queue is retained only as incident history from the first large KV backfill attempt.

5. BigQuery

BigQuery is where analysts consume loaded tables. It is not the job-management surface. Pipeline metadata may be copied into BigQuery later for analytics, but D1 is the operational source of truth.

6. Local CLI

The local CLI remains useful for development, source discovery, and one-off backfills:

dlt-kv namespaces
dlt-kv sources
dlt-kv keys --source-id cloudflare_kv_orders --limit 20
dlt-kv profile --source-id cloudflare_kv_orders --limit 100
dlt-kv sample --source-id cloudflare_kv_orders --limit 20
dlt-kv load --source-id cloudflare_kv_orders --limit 500

For production, prefer creating/running jobs through the Worker API.

7. KV Batch Backfills

Large KV namespaces are loaded as explicit key batches. The current tested leads batch size is 500 keys:

.venv/bin/python scripts/enqueue_kv_batches.py \
  --source-id cloudflare_kv_leads \
  --batch-size 500 \
  --skip-batches 0 \
  --max-batches 1

Use --skip-batches to continue a backfill without replaying earlier batches. Loads use merge on cf_kv_key, so accidental overlap is safe but wastes time.

Scheduled cloudflare_kv_leads runs are not full namespace backfills. They use configured timestamp/date key prefixes over a recent lookback window, then merge by cf_kv_key. Full leads backfills should be launched manually with scripts/enqueue_kv_batches.py.

Normal manual runs for sources with extract.incremental.enabled use the same filtered incremental window as scheduled runs. Use explicit key batches when intentionally doing a full leads backfill.

cloudflare_kv_vendors is configured as a disabled snapshot source for source_cloudflare.vendor_kv. Enable and run it after VENDOR_KV contains records.

Job Definition Shape

The control plane should treat every source as a configured job:

{
  "source_id": "cloudflare_kv_orders",
  "source_type": "cloudflare_kv",
  "enabled": true,
  "schedule": "0 * * * *",
  "destination": {
    "type": "bigquery",
    "dataset": "source_cloudflare",
    "table": "orders_kv",
    "write_disposition": "replace"
  },
  "extract": {
    "namespace_id": "KV_NAMESPACE_ID",
    "namespace_title": "ORDERS_KV",
    "prefix": null
  },
  "staging": {
    "raw_snapshot": true,
    "r2_prefix": "cloudflare_kv/orders"
  }
}

Locally, these jobs live in config/sources.json. In production, the same shape will move into the dlt_control D1 database behind the dlt-orchestrator API.

See Architecture and Operations for the fuller design.

Naming

Cloudflare resources should use the dlt- namespace:

  • dlt-orchestrator
  • dlt-runner
  • dlt-jobs-v2
  • dlt-jobs-v2-dlq
  • dlt-raw-staging
  • dlt_control

Warehouse datasets should be explicit by source domain, for example cloudflare_kv, posthog_raw, or klaviyo_raw, rather than hiding all data under a single generic dataset.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors