Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions lib/message_bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -752,9 +752,20 @@ def new_subscriber_thread
publish("/__mb_keepalive__/", Process.pid, user_ids: [-1])
if (Time.now - (@last_message || Time.now)) > keepalive_interval * 3
logger.warn "Global messages on #{Process.pid} timed out, message bus is no longer functioning correctly"
# The subscriber thread is stuck (half-open TCP connection or similar network issue)
# and is no longer receiving messages. Kill it and let ensure_subscriber_thread
# create a fresh one with a new Redis connection.
@mutex.synchronize do
if @subscriber_thread == thread
thread.kill
@subscriber_thread = nil
end
end
ensure_subscriber_thread
# The new thread sets up its own keepalive blk stop re-queuing this one.
else
timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE
end

timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE
end
end

Expand Down
81 changes: 81 additions & 0 deletions spec/lib/message_bus_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -398,4 +398,85 @@
channel.must_equal('/test')
end
end

describe "keepalive recovery" do
# MIN_KEEPALIVE is 20s in production, making real-timer tests take 60s+.
# We temporarily lower it so keepalive_interval=1 is accepted, which
# triggers recovery at ~4s fast enough for CI, slow enough to be real.
FAST_KEEPALIVE = 1

before do
@original_min_keepalive = MessageBus::Implementation::MIN_KEEPALIVE
MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE)
MessageBus::Implementation.const_set(:MIN_KEEPALIVE, 0)

@log_output = StringIO.new
@bus.configure(keepalive_interval: FAST_KEEPALIVE, logger: Logger.new(@log_output))

# Capture the real backend method before stubbing.
@real_global_subscribe = @bus.backend_instance.method(:global_subscribe)
@call_count = 0

# First call to global_subscribe: simulate a half-open TCP connection.
# The thread stays alive but never yields messages, so @last_message
# goes stale and the keepalive eventually fires recovery.
#
# Subsequent calls: delegate to the real backend so the recovered
# subscriber thread actually processes messages end-to-end.
real_gs = @real_global_subscribe
call_count_ref = -> { @call_count += 1 }
backend = @bus.backend_instance

backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk|
if call_count_ref.call == 1
@subscribed = true
loop { sleep 0.05; break unless @subscribed }
else
# Remove both stubs so the real backend handles the full lifecycle
# of the recovered thread especially global_unsubscribe, which
# destroy needs to signal client.subscribe to exit.
backend.singleton_class.remove_method(:global_subscribe)
backend.singleton_class.remove_method(:global_unsubscribe)
real_gs.call(last_id, &blk)
end
end

backend.define_singleton_method(:global_unsubscribe) do
@subscribed = false
end

@bus.after_fork
wait_for(2000) { @bus.listening? }
end

after do
MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE)
MessageBus::Implementation.const_set(:MIN_KEEPALIVE, @original_min_keepalive)
end

it "recovers message delivery after a stuck subscriber thread" do
@bus.listening?.must_equal true

received = []
@bus.subscribe("/recovery-test") { |msg| received << msg.data }

# Wait for the keepalive to detect the stuck thread and recover.
# With keepalive_interval=1, timeout = 1*3 = 3s, first eligible check at ~4s.
wait_for(8000) do
@bus.publish("/recovery-test", "post-recovery")
sleep 0.1
received.include?("post-recovery")
end

received.must_include("post-recovery")
@bus.listening?.must_equal true
end

it "logs a warning when the keepalive detects a stuck subscriber" do
wait_for(8000) { @log_output.string.include?("no longer functioning correctly") }

@log_output.string.must_include "timed out"
@log_output.string.must_include "no longer functioning correctly"
end
end
end
Loading