Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions scripts/watch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,18 @@ if [ -n "$ACTIVE_NAME" ]; then
fi

while true; do
# Liveness guard (#67): exit promptly once the originating agent session is
# gone. A plain pipe gives no portable way to notice a *downstream* consumer
# that closed silently — printf '' raises no EPIPE, and macOS buffers a final
# write into an already-dead reader — so a quiet watcher whose session died
# would otherwise spin forever (the macOS-runner 33-min stall; #210's job
# timeout only caps the symptom). `kill -0` on the agent pid embedded in the
# composite instance id is portable (Git Bash falls back to tasklist; see
# _agmsg_pid_alive). Gated on a composite id only: a bare id (degraded, no
# resolved agent pid) keeps the prior behavior and is not liveness-gated.
if agmsg_instance_is_composite "$SESSION_ID" && ! agmsg_instance_alive "$SESSION_ID"; then
exit 0
fi
if [ -f "$DB" ]; then
ROWS="$(agmsg_sqlite -separator $'\x1f' "$DB" "
SELECT id, created_at, team, from_agent, to_agent,
Expand Down
99 changes: 66 additions & 33 deletions tests/test_watch.bats
Original file line number Diff line number Diff line change
Expand Up @@ -124,40 +124,55 @@ _wait_for_file_contains() {
[ -f "$TEST_SKILL_DIR/run/watch.$(_iid sess-wm).watermark" ]
}

@test "watch: closed consumer does not advance watermark past an undelivered row" {
local sid="sess-consumer-close"
local iid="$(_iid "$sid")"
@test "watch: exits within one interval when its session dies, without advancing the watermark past an undelivered row (#67)" {
# REWRITTEN from "closed consumer does not advance watermark...". The old test
# asserted that a closed *downstream* consumer (`watch.sh | head -n 1`) made
# the watcher stop and not advance the watermark. That contract is unachievable
# on a plain pipe: a closed reader raises no portable signal until the next
# write (printf '' is silent), and macOS buffers a final write into a dead
# reader — so the watcher would keep delivering+watermarking and then spin
# silently (100% hang on macOS, flaky on Linux; the macOS-runner 33-min stall).
# The real, observable contract is session liveness (#67): when the agent
# process that owns the watcher dies, the liveness guard (run at the top of the
# poll loop) makes the watcher exit within ~1 interval, BEFORE polling/
# delivering any newer row — so it neither hangs nor advances the watermark
# past an unconsumed message. A controllable stand-in session pid (embedded in
# the composite instance id) makes that deterministic. Cross-restart
# redelivery itself is covered by "watch: restart delivers messages that
# arrived while the watcher was down".
local sesspid; sleep 600 & sesspid=$!
local iid="sess-liveness.$sesspid"
local wm="$TEST_SKILL_DIR/run/watch.$iid.watermark"
local pf="$TEST_SKILL_DIR/run/watch.$iid.pid"
local first_out="$TEST_SKILL_DIR/first-delivery.log"

( AGMSG_WATCH_INTERVAL=1 bash "$SCRIPTS/watch.sh" "$sid" "$PROJ" claude-code \
| head -n 1 > "$first_out" ) 2>/dev/null &
local pipeline=$!
local out="$TEST_SKILL_DIR/liveness-delivery.log"

AGMSG_WATCH_INTERVAL=1 bash "$SCRIPTS/watch.sh" "$iid" "$PROJ" claude-code >"$out" 2>/dev/null &
local w=$!
# Wait for the watermark file, not just the pidfile: the pidfile is written
# early (before the subscription is resolved and LAST is seeded), so sending a
# message right after it appears would race the seed and the row would land at
# or below the initial watermark and never be "new". The watermark file is
# written once the watcher is ready to receive.
_wait_for_file "$wm"
[ -f "$pf" ]
local w="$(cat "$pf")"

bash "$SCRIPTS/send.sh" team bob alice "M1-before-consumer-close" >/dev/null
bash "$SCRIPTS/send.sh" team bob alice "M1-delivered" >/dev/null
_wait_for_file_contains "$out" "M1-delivered"
local first_id="$(_max_message_id)"
_wait_for_file_contains "$first_out" "M1-before-consumer-close"

bash "$SCRIPTS/send.sh" team bob alice "M2-after-consumer-close" >/dev/null
# Owning session dies (reap it so kill -0 reports gone, not a zombie), then a
# newer row arrives. The liveness guard runs before the DB poll, so the watcher
# exits before it could deliver or watermark M2.
kill "$sesspid" 2>/dev/null || true
wait "$sesspid" 2>/dev/null || true
bash "$SCRIPTS/send.sh" team bob alice "M2-undelivered" >/dev/null
local second_id="$(_max_message_id)"
_wait_for_missing "$pf" || {
kill "$w" "$pipeline" 2>/dev/null || true
wait "$pipeline" 2>/dev/null || true
false
}
wait "$pipeline" 2>/dev/null || true

_wait_for_missing "$pf" || { kill "$w" 2>/dev/null || true; false; }
run kill -0 "$w"; [ "$status" -ne 0 ]
[ "$first_id" != "$second_id" ]
[ "$(cat "$wm")" = "$first_id" ]

run_watcher_for "$sid" "$TEST_SKILL_DIR/redelivery.log" 2
grep -q "M2-after-consumer-close" "$TEST_SKILL_DIR/redelivery.log"
! grep -q "M1-before-consumer-close" "$TEST_SKILL_DIR/redelivery.log"
! grep -q "M2-undelivered" "$out"
}

@test "watch: closed stdout exits without advancing the watermark" {
Expand Down Expand Up @@ -285,12 +300,17 @@ _wait_pidfile() {
@test "watch: two sessions sharing a session_id keep independent watchers (#93)" {
# Pre-composite instance ids (same sid prefix, different agent pid) — what
# session-start bakes into the directive for two parallel resume processes.
local pf1="$TEST_SKILL_DIR/run/watch.shared.1001.pid"
local pf2="$TEST_SKILL_DIR/run/watch.shared.1002.pid"

AGMSG_WATCH_INTERVAL=5 bash "$SCRIPTS/watch.sh" "shared.1001" "$PROJ" claude-code >/dev/null 2>&1 3>&- &
# The embedded pids must be live: the liveness guard (#67) exits a watcher
# whose session pid is dead, so use real stand-in session processes rather
# than fabricated pids (which would pass or fail by accident of what pid
# happens to exist on the host).
local sp1 sp2; sleep 600 & sp1=$!; sleep 600 & sp2=$!
local pf1="$TEST_SKILL_DIR/run/watch.shared.$sp1.pid"
local pf2="$TEST_SKILL_DIR/run/watch.shared.$sp2.pid"

AGMSG_WATCH_INTERVAL=5 bash "$SCRIPTS/watch.sh" "shared.$sp1" "$PROJ" claude-code >/dev/null 2>&1 3>&- &
local w1=$!
AGMSG_WATCH_INTERVAL=5 bash "$SCRIPTS/watch.sh" "shared.1002" "$PROJ" claude-code >/dev/null 2>&1 3>&- &
AGMSG_WATCH_INTERVAL=5 bash "$SCRIPTS/watch.sh" "shared.$sp2" "$PROJ" claude-code >/dev/null 2>&1 3>&- &
local w2=$!

_wait_pidfile "$pf1" "$w1"
Expand All @@ -302,25 +322,38 @@ _wait_pidfile() {
[ "$(cat "$pf1")" = "$w1" ]
[ "$(cat "$pf2")" = "$w2" ]

kill "$w1" "$w2" 2>/dev/null || true
kill "$w1" "$w2" "$sp1" "$sp2" 2>/dev/null || true
wait "$w1" 2>/dev/null || true
wait "$w2" 2>/dev/null || true
wait "$sp1" 2>/dev/null || true
wait "$sp2" 2>/dev/null || true
}

@test "watch: relaunch with the SAME instance id replaces the previous watcher (#66 preserved)" {
local pf="$TEST_SKILL_DIR/run/watch.solo.2002.pid"
# The composite instance id's pid must belong to a LIVE process: the watcher's
# liveness guard (#67) exits any watcher whose embedded session pid is dead, so
# a fabricated dead pid (the old "solo.2002") would self-exit before the
# relaunch could be observed. Use a real stand-in session process instead.
local sesspid; sleep 600 & sesspid=$!
local iid="solo.$sesspid"
local pf="$TEST_SKILL_DIR/run/watch.$iid.pid"

AGMSG_WATCH_INTERVAL=5 bash "$SCRIPTS/watch.sh" "solo.2002" "$PROJ" claude-code >/dev/null 2>&1 3>&- &
AGMSG_WATCH_INTERVAL=5 bash "$SCRIPTS/watch.sh" "$iid" "$PROJ" claude-code >/dev/null 2>&1 3>&- &
local w1=$!
_wait_pidfile "$pf" "$w1"

AGMSG_WATCH_INTERVAL=5 bash "$SCRIPTS/watch.sh" "solo.2002" "$PROJ" claude-code >/dev/null 2>&1 3>&- &
AGMSG_WATCH_INTERVAL=5 bash "$SCRIPTS/watch.sh" "$iid" "$PROJ" claude-code >/dev/null 2>&1 3>&- &
local w2=$!
# Successor claims the pidfile slot...
_wait_pidfile "$pf" "$w2"
# ...and the previous holder was killed.
# ...and the previous holder was killed. The successor SIGTERMs the old holder
# and then writes its own pid, so the pidfile can flip to w2 a beat before w1's
# TERM trap has run — poll for w1's exit rather than checking the instant the
# pidfile changes (the old single check raced this and flaked).
local i; for i in $(seq 1 30); do kill -0 "$w1" 2>/dev/null || break; sleep 0.1; done
run kill -0 "$w1"; [ "$status" -ne 0 ]

kill "$w2" 2>/dev/null || true
kill "$w2" "$sesspid" 2>/dev/null || true
wait "$w2" 2>/dev/null || true
wait "$sesspid" 2>/dev/null || true
}
Loading