| Field | Value |
|---|---|
| Module | dev.helix.pipeline |
| Revision | 1 |
| Created | 2026-06-10 |
| Last modified | 2026-06-10 |
| Status | active |
| Status summary | Initial scaffold: push-based operators + FBP network with bounded backpressure + 5 passing race tests + anti-bluff Challenge. |
| Org | HelixDevelopment |
| Visibility | public |
| Go | 1.26 |
A generic, reusable staged streaming dataflow runtime for Go: composable
push-based operators over typed streams, plus a flow-based-programming (FBP)
Component/Port/Network layer with bounded backpressure.
- Why
- Install
- Quick start — operators
- Quick start — FBP network
- Core API
- Backpressure
- Testing & anti-bluff Challenge
- Governance
Go channels + a worker pool give the low-level substrate but not the ergonomic
operator/FBP API for staged pipelines: composable Map/Filter/FlatMap/
Window/MapReduce stages, declaratively-wired typed-port components, and
backpressure that just works. This module is that small, reusable layer (an Rx +
FBP subset — not full Apache Beam parity).
package main
import (
"context"
"fmt"
pipeline "dev.helix.pipeline"
)
func main() {
src := pipeline.FromSlice([]int{1, 2, 3, 4, 5, 6})
doubled := pipeline.Map(func(x int) (int, error) { return x * 2, nil })(src)
evens := pipeline.Filter(func(x int) bool { return x%4 == 0 })(doubled)
windows, _ := pipeline.Collect(context.Background(), pipeline.Window[int](2)(evens))
fmt.Println(windows) // [[4 8] [12]]
sums, _ := pipeline.MapReduce[int, string, int](
context.Background(),
pipeline.FromSlice([]int{1, 2, 3, 4}),
func(x int) (string, int) { if x%2 == 0 { return "even", x }; return "odd", x },
func(acc, next int) int { return acc + next },
)
fmt.Println(sums) // map[even:6 odd:4]
}out := pipeline.NewPort("out", 4) // bounded buffer => backpressure
in := pipeline.NewPort("in", 4)
net := pipeline.NewNetwork().
Add(producerComponent(out)).
Add(consumerComponent(in))
net.Connect(out, in)
net.Run(context.Background()) // blocks until both complete; slow consumer blocks producerA Component implements Name() string and Run(ctx) error, reading from and
writing to bounded Ports.
type Stream[T any] interface {
Subscribe(ctx context.Context, sink func(T) error) error
}
type Operator[T any, U any] func(in Stream[T]) Stream[U]
func Map[T, U any](fn func(T) (U, error)) Operator[T, U]
func Filter[T any](pred func(T) bool) Operator[T, T]
func FlatMap[T, U any](fn func(T) []U) Operator[T, U]
func Window[T any](size int) Operator[T, []T]
func MapReduce[T any, K comparable, R any](ctx, in, mapper, reducer) (map[K]R, error)
type Component interface { Name() string; Run(ctx context.Context) error }
type Port struct { /* bounded channel */ }
type Network struct { /* components + connections */ }Ports are bounded channels. Port.Send blocks once the bound is reached until
the downstream drains, so a slow consumer naturally throttles a fast producer —
no unbounded buffering, no OOM. The anti-bluff Challenge captures the max
in-flight count to prove the bound holds.
go test -race -count=1 -v ./...
./challenges/pipeline_runtime_challenge.shThe Challenge feeds 10,000 records through Map → Filter → MapReduce, asserts
the aggregate matches an independently-computed golden value, proves bounded
backpressure under a slow sink (captured max-in-flight ≤ bound), and proves no
goroutine leak after completion.
Inherits the consuming project's constitution/ canonical root (CONST-059).
Anti-bluff §11.4 family, CONST-047/050/051/052/053/054, §11.4.113 (no
force-push), §11.4.135 (regression guards) all bind. See CLAUDE.md,
CONSTITUTION.md, AGENTS.md.