Skip to content

lib: add streaming decode, stream producers, and emit macro#130

Merged
efd6 merged 1 commit into
devfrom
streaming
Jun 17, 2026
Merged

lib: add streaming decode, stream producers, and emit macro#130
efd6 merged 1 commit into
devfrom
streaming

Conversation

@efd6

@efd6 efd6 commented Jun 16, 2026

Copy link
Copy Markdown
Collaborator

Add three composable features for processing large payloads in CEL programs without materialising all records in memory:

  • Opaque stream type (streamVal) wrapping io.Reader, with stream_gzip and stream_zip producers that wrap decompression readers around in-memory bytes.

  • Lazy JSON stream decoder (decode_json_stream_lazy) that accepts stream, bytes, or string and returns a traits.Iterable decoding values on demand via json.Decoder.

  • Emit macro that iterates a list or iterable, calling an Emitter callback for each element. Supports optional per-element cursor tracking. In the mito CLI, emitted events are printed to stderr.

@andrewkroh andrewkroh left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a decode_csv_stream_lazy feasible (not needed now)? I think that would make it possible to apply emit to the ti_recordedfuture integration.

Comment thread lib/emit.go Outdated
Comment on lines +36 to +37
// during CEL evaluation. The beats-side adapter wraps inputcursor.Publisher
// behind this interface.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// during CEL evaluation. The beats-side adapter wraps inputcursor.Publisher
// behind this interface.
// during CEL evaluation.

I think we should not mention details about beats in the library side.

Comment thread lib/emit.go Outdated
Comment on lines +76 to +77
// If Emit returns an error, iteration stops. The result reflects the count
// and cursor at the point of failure.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we return an error field in the result map (e.g. {"published": N, "incomplete": true, "error": "..."}) so the program/cursor logic can potentially react? IIUC a decode error is indistinguishable from a clean EOF without it.

Comment thread lib/stream.go
Add three composable features for processing large payloads in CEL
programs without materialising all records in memory:

- Opaque stream type (streamVal) wrapping io.Reader, with stream_gzip
  and stream_zip producers that wrap decompression readers around
  in-memory bytes.

- Lazy JSON stream decoder (decode_json_stream_lazy) that accepts
  stream, bytes, or string and returns a traits.Iterable decoding
  values on demand via json.Decoder.

- Emit macro that iterates a list or iterable, calling an Emitter
  callback for each element. Supports optional per-element cursor
  tracking. In the mito CLI, emitted events are printed to stderr.
@efd6

efd6 commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator Author

Added CSV and line streams (for TSV and other failures to read prior art).

@andrewkroh andrewkroh left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🚤

@efd6 efd6 merged commit b54ab9f into dev Jun 17, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants