[Propagation] End SEEN_* rebroadcast starvation in the reaper#175
[Propagation] End SEEN_* rebroadcast starvation in the reaper#175rafa-js wants to merge 3 commits into
Conversation
The propagation reaper rebroadcasts txs stuck in SEEN_ON_NETWORK / SEEN_MULTIPLE_NODES, but two defects let valid-but-stuck txs sit forever: - The per-tick cap was the hardcoded constant 200; the configurable ReaperBatchSize was read into a field reapOnce never used, so the knob was dead and the effective cap was always 200. - Candidates came from IterateStatusesSince, whose Postgres impl orders newest-first. Each saturated tick re-sent the newest 200 stale rows and never reached older ones, so a tx stuck a few hours was permanently starved (e.g. one broadcast with an unconfirmed ancestor that later confirmed and became minable, but was never rebroadcast). Fix: add a per-tx rebroadcast throttle. A new last_rebroadcast_at marker plus reaper_rebroadcast_interval_ms (default 1h) gate re-sends, and a new GetReapCandidates store query returns due SEEN_* rows oldest-rebroadcast- first. reapOnce now honors reaperBatchSize and stamps every attempted txid (on attempt, not just success) so capacity spreads across the whole backlog over one interval instead of re-sending the same rows every tick. No row is starved; recoverable txs are never auto-rejected. Adds the column + idempotent migration + partial index for Postgres and round-trips the field through Aerospike and Pebble. Covered by new reaper tests including a fairness-across-ticks assertion.
👋 Thanks, @rafa-js!This pull request comes from a fork. For security, our CI runs in a restricted mode.
Thanks for contributing to bsv-blockchain/arcade! 🚀 |
There was a problem hiding this comment.
Pull request overview
This PR fixes reaper rebroadcast starvation for transactions stuck in SEEN_ON_NETWORK / SEEN_MULTIPLE_NODES by making candidate selection fair across backlogs and adding a per-tx rebroadcast throttle, backed by a new persisted last_rebroadcast_at marker across store implementations.
Changes:
- Wire
reaper_batch_sizeintoreapOnceand replace the old fixed-size scan/stop behavior with a store-levelGetReapCandidatesquery ordered oldest-rebroadcast-first. - Add per-tx throttle via
last_rebroadcast_at+reaper_rebroadcast_interval_ms, and stamp attempted txids usingMarkRebroadcastByTxIDs. - Add storage support (schema/migration/index + Aerospike/Pebble round-trip) and reaper-focused unit tests.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| store/store.go | Extends the Store interface with GetReapCandidates and MarkRebroadcastByTxIDs contracts. |
| store/postgres/schema.sql | Adds last_rebroadcast_at column + partial index to support due-candidate ordering. |
| store/postgres/postgres.go | Implements Postgres candidate query and rebroadcast stamping. |
| store/pebble/pebble.go | Implements Pebble candidate scan/sort and rebroadcast stamping; persists the marker. |
| store/aerospike/aerospike.go | Implements Aerospike scan/sort candidate selection and rebroadcast stamping; persists the marker. |
| services/propagation/reaper.go | Switches reaper to store-driven candidate selection + per-tx throttle marking. |
| services/propagation/reaper_test.go | Adds focused tests for batch sizing, throttling, marking-on-attempt, and fairness. |
| services/propagation/propagator.go | Plumbs reaper_rebroadcast_interval_ms config into the propagator. |
| services/propagation/propagator_test.go | Extends the mock store to support the new reaper store APIs. |
| models/transaction.go | Adds LastRebroadcastAt to the transaction status model. |
| config/config.go | Adds reaper_rebroadcast_interval_ms config + default. |
| services/webhook/service_test.go | Updates test fake store to satisfy the extended Store interface. |
| services/api_server/handlers_test.go | Updates test mock store to satisfy the extended Store interface. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| SELECT txid, raw_tx FROM transactions | ||
| WHERE status IN ('SEEN_ON_NETWORK', 'SEEN_MULTIPLE_NODES') | ||
| AND timestamp_at >= $1 AND timestamp_at < $2 | ||
| AND raw_tx IS NOT NULL |
There was a problem hiding this comment.
Good catch — tightened to length(raw_tx) > 0 so an empty (non-NULL) bytea is excluded too, matching the len(rawTx) == 0 check the Aerospike/Pebble/mock backends already use.
| // GetReapCandidates returns the SEEN_* rows the reaper should rebroadcast, | ||
| // oldest-unserved first. See store.Store for the full contract. Only txid and | ||
| // raw_tx are selected — the reaper needs nothing else, and ordering by | ||
| // last_rebroadcast_at NULLS FIRST is what spreads each tick's bounded batch | ||
| // across the whole backlog instead of re-sending the same rows every tick. | ||
| func (s *Store) GetReapCandidates(ctx context.Context, since, seenDeadline, rebroadcastBefore time.Time, limit int) ([]*models.TransactionStatus, error) { | ||
| if limit <= 0 { | ||
| return nil, nil | ||
| } |
There was a problem hiding this comment.
Done — scanStatus/scanStatusWithInserted now scan last_rebroadcast_at, and the GetStatus / GetStatusesSince / IterateStatusesSince SELECTs plus the batch INSERT … RETURNING include it. GET /tx and tooling now see the same value the Aerospike/Pebble backends already round-trip (was always zero on Postgres).
| prefix := idxTxUpdatedPrefix() | ||
| iter, err := s.db.NewIter(&pebbledb.IterOptions{ | ||
| LowerBound: prefix, | ||
| UpperBound: endOfPrefix(prefix), | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer func() { _ = iter.Close() }() | ||
|
|
||
| sinceNs := since.UnixNano() | ||
| deadlineNs := seenDeadline.UnixNano() | ||
| type candidate struct { |
There was a problem hiding this comment.
Fixed — the iterator now seeks to the first key at/after since (LowerBound: idxTxUpdatedKey(sinceNs, "")) and breaks as soon as a row’s timestamp reaches seenDeadline (the idx:tx:updated keyspace is timestamp-ordered), so per-tick work is bounded by the [since, seenDeadline) window instead of the full history.
| type candidate struct { | ||
| st *models.TransactionStatus | ||
| lastRebcastNs int64 // 0 == never rebroadcast (sorts first) | ||
| } | ||
| var candidates []candidate | ||
| for iter.First(); iter.Valid(); iter.Next() { | ||
| if err := ctx.Err(); err != nil { | ||
| return nil, err | ||
| } | ||
| txid := lastSegment(iter.Key()) | ||
| st, err := s.readStatus(txid) | ||
| if err != nil || st == nil { | ||
| continue | ||
| } | ||
| if st.Status != models.StatusSeenOnNetwork && st.Status != models.StatusSeenMultipleNodes { | ||
| continue | ||
| } | ||
| tsNs := st.Timestamp.UnixNano() | ||
| if tsNs < sinceNs || tsNs >= deadlineNs { | ||
| continue | ||
| } | ||
| if len(st.RawTx) == 0 { | ||
| continue | ||
| } | ||
| if !st.LastRebroadcastAt.IsZero() && !st.LastRebroadcastAt.Before(rebroadcastBefore) { | ||
| continue | ||
| } | ||
| var lastNs int64 | ||
| if !st.LastRebroadcastAt.IsZero() { | ||
| lastNs = st.LastRebroadcastAt.UnixNano() | ||
| } | ||
| candidates = append(candidates, candidate{ | ||
| st: &models.TransactionStatus{TxID: st.TxID, RawTx: st.RawTx}, | ||
| lastRebcastNs: lastNs, | ||
| }) | ||
| } | ||
| sort.Slice(candidates, func(i, j int) bool { | ||
| return candidates[i].lastRebcastNs < candidates[j].lastRebcastNs | ||
| }) | ||
| if len(candidates) > limit { | ||
| candidates = candidates[:limit] | ||
| } |
There was a problem hiding this comment.
Fixed — replaced the accumulate-all-then-sort with a bounded max-heap of size limit (largest last_rebroadcast_at evicted when full), so memory stays O(limit) regardless of backlog. Added a Pebble GetReapCandidates test covering the window filter, eligibility, and bounded oldest-rebroadcast-first ordering.
Follow-up to the reaper fairness fix, addressing review of bsv-blockchain#175: - Registration-failure starvation (bsv-blockchain#1): the reaper now captures the `failed` set from registerBatch (previously discarded) and stamps it on partial failures, so a tx that keeps failing merkle /watch can't stay last_rebroadcast_at=NULL, sort first, and re-fill the front of every batch. A full outage (nothing registered) still stamps nothing so it isn't penalized. - Outcome-class backoff (bsv-blockchain#2): accepted-but-unmined rows stay SEEN_* (the lattice forbids the SEEN->ACCEPTED downgrade) and only need the full-interval refresh; transient failures (Teranode requeue/unknown, plus registration failures) retry after a shorter reaper_requeue_backoff_ms (default 1m, clamped <= rebroadcast interval), applied by backdating the stamp. Restores fast recovery from transient blips instead of waiting the full 1h. - raw_tx contract (bsv-blockchain#3): Postgres GetReapCandidates now filters length(raw_tx) > 0, matching the len()==0 check in the other backends. - Index note (bsv-blockchain#4): document that idx_tx_reap_due's range/raw_tx predicates are inline rechecks; flag for future tuning. No code change. - Sentinel alignment (bsv-blockchain#5): Pebble's in-Go never-rebroadcast sort sentinel now matches Aerospike (math.MinInt64). Adds TestReapOnce_PartialRegistrationFailureMarksFailedSubset and TestReapOnce_RequeueUsesShortBackoff.
Follow-up to bsv-blockchain#175 review (Copilot): - Postgres reads now surface last_rebroadcast_at: scanStatus / scanStatusWithInserted scan it and the GetStatus / GetStatusesSince / IterateStatusesSince SELECTs and the batch INSERT...RETURNING include it, so GET /tx and tooling see the same value the Aerospike/Pebble backends already round-trip (was always zero on Postgres). - Pebble GetReapCandidates no longer scans the full updated-at history every tick: it seeks to the first key at/after `since` and stops once a row reaches `seenDeadline` (the index is timestamp-ordered). - Pebble GetReapCandidates keeps only the `limit` oldest-rebroadcast candidates via a bounded max-heap, so memory is O(limit) instead of O(backlog). The empty-vs-NULL raw_tx concern was already handled in the prior commit (length(raw_tx) > 0). Adds a Pebble GetReapCandidates test covering the window filter, eligibility, and bounded oldest-first ordering.
What Changed
reaper_batch_sizeknob:reapOncenow uses the configured batch size instead of a hardcoded200.last_rebroadcast_atmarker +reaper_rebroadcast_interval_msconfig (default 1h, envARCADE_PROPAGATION_REAPER_REBROADCAST_INTERVAL_MS).GetReapCandidates/MarkRebroadcastByTxIDsto the store interface; the reaper now pulls dueSEEN_*rows oldest-rebroadcast-first and stamps every attempted txid (on attempt, not just success).last_rebroadcast_atcolumn, idempotent migration, and partial indexidx_tx_reap_due. Field round-tripped through Aerospike and Pebble.reaperRebroadcastBatchconstant anderrReaperBatchFullsentinel.Why It Was Necessary
The reaper rebroadcasts txs stuck in
SEEN_ON_NETWORK/SEEN_MULTIPLE_NODES, but two defects let valid-but-stuck txs sit forever. The per-tick cap was a hardcoded200(the config field was read but never used), and the Postgres candidate query orders newest-first — so each saturated tick re-sent the newest 200 stale rows and never reached older ones. A tx stuck a few hours (e.g. broadcast with an unconfirmed ancestor that later confirmed and became minable) was permanently starved and never recovered. The per-tx interval throttle spreads each tick's bounded batch across the whole backlog instead of re-sending the same rows every 30s.Testing Performed
services/propagation/reaper_test.go:TestReapOnce_UsesConfiguredBatchSize,TestReapOnce_SkipsRecentlyRebroadcast,TestReapOnce_MarksRebroadcastOnAttempt,TestReapOnce_FairnessAcrossTicks(every backlogged row rebroadcast exactly once over the interval),TestReapOnce_RecoverableNotRejected.go build ./...,go vet ./...,staticcheck, andgolangci-lintall clean; fullgo test ./...green.SEEN_MULTIPLE_NODESrow withraw_tx, confirm one tick rebroadcasts + stampslast_rebroadcast_at, and the next tick within the interval skips it.Impact / Risk
ADD COLUMN IF NOT EXISTSand defaults NULL (existing stuck rows become immediately eligible).GetReapCandidatesdoes a full-set scan (no secondary index) — documented; production runs on Postgres.Notifications