From 35ed95ec5e1609b13393138e08d807c173411912 Mon Sep 17 00:00:00 2001 From: Russ Smith Date: Mon, 22 Jun 2026 16:34:01 -0700 Subject: [PATCH] Close fiber_channel on Server#shutdown to avoid fiber leak. Fixes #110 During Server#shutdown the fiber_channel was never closed, so the process_subscribed_messages fiber was left blocked on receive against an orphaned channel. Since Cable.restart builds a fresh Server (with a fresh channel) on every restart, each restart leaked a fiber and channel. Close the channel at the end of shutdown, and switch the receive loop to receive? so the fiber exits cleanly instead of raising Channel::ClosedError on a closed channel. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01BMW1EsdCMExBhm3tg7TQhs --- spec/cable/server_spec.cr | 19 +++++++++++++++++++ src/cable/server.cr | 5 ++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/spec/cable/server_spec.cr b/spec/cable/server_spec.cr index a505ec0..4572be8 100644 --- a/spec/cable/server_spec.cr +++ b/spec/cable/server_spec.cr @@ -18,6 +18,25 @@ describe Cable::Server do end end + describe "#shutdown" do + it "closes the fiber_channel so the subscribed messages fiber doesn't leak" do + Cable.reset_server + Cable.temp_config(backend_class: Cable::DevBackend) do + server = Cable.server + server.fiber_channel.closed?.should be_false + + server.shutdown + + server.fiber_channel.closed?.should be_true + # Sending to the closed channel must raise rather than silently leak. + expect_raises(::Channel::ClosedError) do + server.fiber_channel.send({"foo", "bar"}) + end + end + Cable.reset_server + end + end + describe "#active_connections_for" do it "accurately returns active connections for a specificic token" do Cable.reset_server diff --git a/src/cable/server.cr b/src/cable/server.cr index 2e35744..4710286 100644 --- a/src/cable/server.cr +++ b/src/cable/server.cr @@ -195,6 +195,9 @@ module Cable connections_to_close.each do |connection| connection.close end + # Close the channel so the `process_subscribed_messages` fiber stops + # blocking on `receive` and exits cleanly instead of leaking across restarts. + fiber_channel.close end def restart? @@ -210,7 +213,7 @@ module Cable private def process_subscribed_messages server = self spawn(name: "Cable::Server - process_subscribed_messages") do - while received = fiber_channel.receive + while received = fiber_channel.receive? channel, message = received if channel.starts_with?("cable_internal") identifier = channel.split('/').last