Skip to content

Johnny/runtime v2 soak#2986

Draft
jgraettinger wants to merge 22 commits into
masterfrom
johnny/runtime-v2-soak
Draft

Johnny/runtime v2 soak#2986
jgraettinger wants to merge 22 commits into
masterfrom
johnny/runtime-v2-soak

Conversation

@jgraettinger
Copy link
Copy Markdown
Member

Description:

(Describe the high level scope of new or changed features)

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

(anything that might help someone review this PR)

The derive and materialize `build_stats_doc()` routines silently skipped
out-of-range binding indices with `continue;`, papering over what is
actually a shard protocol violation. Surface these as proper errors via
a new `Action::Error` variant, which `dispatch()` returns up the stack to
terminate the actor.
On recovery of a prepared-but-uncommitted transaction, `handler::serve`
builds the first HeadIdle with `idempotent_replay = true` and empty
extents. The recovered hints arrive in the pending `ready_frontier`, but
the close policy reads `unresolved_hints` from `extents.frontier` — empty
until a frontier is applied. So the first tick yields `may_extend = false`
(replay suppresses policy-driven extend, and no extents hints force it):
the recovered frontier is never applied, the txn can neither open nor
close, and the Actor re-steps Idle forever.

Treat an unstarted replay as itself an unresolved hint, forcing the first
extend through the existing close-policy path. Once the recovered frontier
lands in extents, its real hints drive the policy until they resolve and
the replay txn closes.

Fold a regression case into the happy-path FSM test by making its first txn
an idempotent replay rather than a standalone test.
…s errors

Publisher::enqueue and enqueue_owned now accept closures returning
tonic::Result rather than bare tuples, so callers can propagate errors.
patch_document_uuid now returns tonic::Result instead of silently returning
on missing or malformed UUID placeholders.
Built-in TypeScript/Python derivations were validated against the frozen
V1 `:dev` connector image but run against the V2 image, so a V2 module
failed to type-check during Validate (TS2305 on FlushResponse). Resolve
the image at build time from the task's shard flags and write a concrete
Image connector into the built spec, so Validate and the runtime compile
against the same connector interface:

`derive-image-tag` flag overrides the tag (e.g. `local`); otherwise
`:stable` for `enable-runtime-v2` tasks, else `:dev`. The model `using`
stays abstract; only the built connector_type/config_json become Image.

runtime-next now errors on an unresolved built-in (it must be mapped at
build time); the V1 runtime keeps its `:dev` mapping as a legacy
fallback for already-activated built specs.
The State-Update-Wire-Format framed connector merge patches with `\n`,
but image connectors (derive-typescript/python) read requests as
newline-delimited JSON via `readLines`. A Flush request's embedded
`statePatches` array therefore split across lines, and the connector's
`JSON.parse` aborted with "Expected ',' or ']' after array element",
crashing every built-in TypeScript/Python V2 derivation once non-empty
state flowed (in both the reactor and `preview-next`).

Switch the patch delimiter from `\n` to `\t`: a tab never appears raw in
compact JSON (control chars are escaped inside strings and there is no
insignificant whitespace) yet is still valid JSON whitespace between
array elements, so the payload survives the connector's line framing and
parses directly.
The previous floor was stored in `self.ready.flushed_lsn` and applied only
when emitting a fully-resolved ready checkpoint. A peek sourced from
`unresolved` bypassed that floor, so it could carry a lower `flushed_lsn`
than a ready already emitted to the client—causing the Reader to see a
regressing `set_flushed_lsn`.

Introduces `emitted_flushed_lsn` as a dedicated monotonic floor that
`floor_flushed_lsn()` advances on every emission (both ready and peek),
guaranteeing element-wise non-regression regardless of which path fires.
Front the runtime-v2 Leader and Shuffle gRPC services with inbound
authentication and authorization. A per-service interceptor enforces a
capability floor (LEAD for the Leader, SHUFFLE for Shuffle), and each
handler then authorizes the token's selector against the shard it
operates on.

- proto-grpc: new `auth` module. Authenticator verifies inbound bearers
  against a trusted issuer and a rotating key set; Authorizer/Authorized
  carry out the per-request scope check and gate the must-use proof that
  a check actually ran; Signer self-signs short-lived data-plane bearers
  scoped to a shard-id task prefix.
- labels: port gazette's LabelSelector.Matches merge-join as
  `labels::matches`, rejecting unsorted (untrusted) selector input rather
  than silently re-sorting. `shard::id_prefix` now retains its trailing
  '/' so an `id:prefix` scope can't bleed into a sibling task.
- runtime-next, shuffle: sign LEAD/SHUFFLE bearers when dialing peers and
  thread an Authorizer through each handler. validate_join and
  validate_shard_ranges now require a topology to share one shard-id
  prefix, so authorizing shard zero covers every shard.
- proto-flow, go: define the LEAD capability bit.

In-process and `flowctl preview` paths use a trusted-local Authorizer /
no Signer and are unauthenticated by construction.
…_VALID

This flag is used by the `shuffle` pipeline to mark ok-vs-failed validations.
Re-validate within the shard runtime to surface a meaningful user error.
Build runtime-sidecar in the GNU/glibc release build, link it into the
per-arch package directory, and install it into the reactor image
alongside flowctl-go and flow-connector-init.
Introduce `stalled_reads` (a HashSet of read IDs that are pending AND
non-tailing) to make head-of-line-blocking visible: a non-tailing read
in `pending_reads` blocks heap drain until all pending reads tail.

`park_or_process` consolidates the entry path: it first tries to resolve
a read immediately via `now_or_never` (skipping a park if data is already
buffered), then classifies the read as tailing or stalled before parking it.
`on_pending_read_resolved` performs the exactly-inverted de-classification
on resolution, eliminating the scattered `tailing_reads` adjustments
that were previously duplicated across every error and terminal path.

Stall transitions fire a structured `stall` trace event and update the
new `shuffle_slice_stalled_reads` gauge so operators can observe which
journals are blocking and for how long.

Also introduces `ReadState::resuming`.

Now that stalled reads are tracked explicitly, we no longer require
tailing() to stay constant between batches. Allow it to update after
metadata.
proto_gazette::uuid::Clock::tick() is a port of Go's
go.gazette.dev/core/message.Clock.Tick(), which advances by one
microsecond (160 == 10 units of 100ns << 4 counter bits). The Rust
port had drifted and still incremented by 1, a sub-100ns counter step.
Replace per-shard BufWriter<Stdout> + scratch buffer with a single
line_buf that accumulates complete ["<name>",<body>]\n lines and
flushes them via a single atomic write_all under the stdout lock.
Multiple shards sharing process-global stdout could previously splice
partial lines together; flushing whole lines in one call prevents that.
When loading local: connector endpoints from file:// sources, rewrite
command[0] to an absolute path — bare names are found on the $PATH,
relative paths are resolved against the declaring source file's
directory, and absolute paths / non-file:// sources are left unchanged.
Previously, Ctrl-C returned Ok(()) immediately, leaving the session
loop running while its resources (RocksDB, tonic server) were dropped —
causing pthread lock errors. Unify timeout and Ctrl-C arms to both
cancel the stop_token and await the loop, and use Duration::MAX to
eliminate the if/else branching.
Previously include/exclude matching used sorted binary search on exact
values, silently ignoring the `prefix` flag and empty-value wildcard
that `labels::matches` supports.  Replace `Vec<Box<str>>` with
`Vec<ConstraintValue>` and a `matches` method that mirrors the labels
semantics exactly.
Refactor the key/value extraction path so that a single `doc::Encoding`
selects between the existing FoundationDB packed-tuple output and a new
JSON array output. The two extraction strategies (the flat extractor
slice and the compiled block merge-join plan) and the two encodings now
share one core (`write_encoded`), driven by a `Resolve` trait that
yields ordered (extractor, resolved-node) pairs. The truncation-indicator
backpatch is implemented once per encoding.

Also adds `Extractor::packed_key_prefix_len`, which detects whether a
key prefix fully contains a packed key, which can be used to skip
re-extraction given a covering prefix.

`extract_all` / `extract_all_owned` gain an `encoding` and an
`Option<&AtomicBool>` indicator argument, collapsing the former
`*_indicate_truncation` variants. All callers are updated to request
`Encoding::Packed`, preserving current behavior.
Resolve the wire codec (protobuf vs JSON) from the connector image's
FLOW_RUNTIME_CODEC label/env during container startup, thread it through
the derive and materialize shards, and use it to populate the
conditional JSON tuple fields of the connector protocol:

  - materialize Load/Store: key_json / values_json for JSON connectors,
    key_packed / values_packed for protobuf connectors.
  - derive Read.shuffle: key_json for JSON connectors, packed otherwise;
    hash is always set. A lambda-computed shuffle key can't be extracted
    as JSON, so JSON connectors then rely on hash alone.

Materialize and derive scans now reuse the shuffle log's 16-byte
packed-key prefix when it provably contains the whole key (via
Extractor::packed_key_prefix_len), re-extracting from the source
document only when the prefix may be truncated. Derive transforms gain
shuffle_key_extractors for JSON key extraction; materialize keys are
extracted with a no-op SerPolicy so they stay byte-identical to the
shuffle writer.

connector-init exposes inspect::Image::runtime_codec() and
parse_from_json_slice() so the runtime negotiates the same codec that
flow-connector-init will. Regenerates derive/materialize protobuf docs.
When a cohort reads one journal under multiple bindings (distinct
journalReadSuffixes), each (journal, binding) shuffle read advances
independently. Checkpoint visibility could then advance one binding
past a producer's commit before a peer binding read through it,
yielding divergent id-sets per transaction during backfill.

extract_causal_hints now projects a self-hint from an ACK's own
(journal, producer) to the *other* cohort bindings reading that
journal, holding checkpoint visibility until all have read through
the commit. HintIndex gains a `cohort_shares_journal` flag so the
common 1:1 journal-to-binding case pays only a boolean test per ACK.
@jgraettinger jgraettinger force-pushed the johnny/runtime-v2-soak branch from ce82d74 to 1d67d55 Compare May 31, 2026 05:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant