Feat: Implement WebSocket Scaling with Horizontal Load Balancing and Regional Discovery (#568)#680
Feat: Implement WebSocket Scaling with Horizontal Load Balancing and Regional Discovery (#568)#680kryputh wants to merge 4 commits into
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughImplements WebSocket scaling with horizontal load balancing across the platform. The backend adds Redis pub/sub dependencies and a new Changes
Sequence DiagramsequenceDiagram
participant Client as useChessSocket
participant Provider as WebSocketScalingProvider
participant WSServer as WebSocket Server
participant Redis as Redis Pub/Sub
Client->>Provider: getOptimalNode(gameId)
Note over Provider: Check cached node for gameId
alt Node not cached
Provider->>Provider: Discover availableNodes
Provider->>Provider: Select lowest-load node
Provider->>Provider: Cache node for gameId
end
Provider-->>Client: return WebSocketNode
Client->>Client: construct ws://node.url
Client->>WSServer: Connect WebSocket
WSServer-->>Client: Connection established
Note over WSServer,Redis: On game state change
WSServer->>Redis: publish_move(room_id, message)
Redis->>Redis: Broadcast to game:room_id channel
WSServer->>WSServer: receive subscribed updates
WSServer-->>Client: Emit game state via WebSocket
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
|
@kryputh Great news! 🎉 Based on an automated assessment of this PR, the linked Wave issue(s) no longer count against your application limits. You can now already apply to more issues while waiting for a review of this PR. Keep up the great work! 🚀 |
…ve invalid feature flag in backend CI
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
frontend/hook/useChessSocket.ts (1)
84-200:⚠️ Potential issue | 🟡 MinorMissing
getOptimalNodeincreateWebSocket's dependency array.
createWebSocketcloses overgetOptimalNode(line 89) but theuseCallbackdeps at line 200 are[gameId, calculateReconnectDelay, status]. WhilegetOptimalNodeis currently stable viauseCallback([], …)in the provider, this violates the React Hooks ESLint rule and creates a stale-closure risk. If the provider ever recomputesgetOptimalNode(e.g., to depend on discovery state), reconnects will silently use a stale resolver.♻️ Add the missing dependency
- }, [gameId, calculateReconnectDelay, status]); + }, [gameId, calculateReconnectDelay, status, getOptimalNode]);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/hook/useChessSocket.ts` around lines 84 - 200, createWebSocket currently closes over getOptimalNode but its useCallback dependency array only includes [gameId, calculateReconnectDelay, status]; add getOptimalNode to that dependency array so createWebSocket is recreated when getOptimalNode changes (update the useCallback dependencies for the createWebSocket function to include getOptimalNode to satisfy hooks linting and avoid stale closures).backend/src/socket/game.rs (1)
137-232:⚠️ Potential issue | 🟠 Major
send_movebroadcasts only locally — needs to publish to Redis oncepubsubis wired.This is the natural attach point for cross-node synchronization: after the local
sender.send(response.clone())succeeds (around Line 228), the sameresponseshould be forwarded throughstate.pubsubso other nodes can re-broadcast it. Note thatpublish_moveisasyncwhilesend_moveis sync and holds theGAME_STATEstd::sync::Mutex— calling.awaithere will require either making this function async (and dropping the lock guard before awaiting) or spawning atokio::taskwith a clonedRedisPubSubhandle. Awaiting while holdingstatewould also be a deadlock hazard.(Flagging at the root cause; same applies to
join_room,leave_room, and the takeback paths.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/src/socket/game.rs` around lines 137 - 232, send_move currently only broadcasts locally and must also publish the same ServerMessage to Redis via state.pubsub.publish_move without awaiting while holding GAME_STATE's Mutex; fix by cloning the response (response.clone()) and a handle to state.pubsub (e.g., let pubsub = state.pubsub.clone()) before releasing the lock, drop the lock guard, then spawn a tokio::task::spawn(async move { let _ = publish_move(pubsub, response_clone).await; }) so you don't await inside send_move or hold the mutex during the async publish; ensure any necessary Clone impls are used and apply the same pattern to join_room, leave_room, and takeback paths.
🧹 Nitpick comments (3)
frontend/hook/useChessSocket.ts (1)
178-183: Dead reconnect watchdog — only logs, captures stalestatus.This
setTimeoutreadsstatusfrom the closure (captured whenonclosefired) and onlyconsole.logs when the timeout elapses. It doesn't cancel, escalate, or alter reconnection. Either remove it, or make it actionable (e.g., abort the in-flight attempt and try the next node viagetOptimalNode) — the comment on line 181 ("Continue trying but log the timeout") signals it was already known to be non-functional.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/hook/useChessSocket.ts` around lines 178 - 183, The reconnect watchdog set in reconnectTimerRef using RECONNECT_TIMEOUT currently closes over a stale status and only logs; replace it with actionable logic: when the timer fires check the latest connection state via a mutable ref (e.g., add or use statusRef.current) rather than the closed-over status, then either abort the current in-flight attempt (close the socket) and call the existing reconnect routine (e.g., attemptReconnect or the onopen/onclose reconnection flow) or explicitly call getOptimalNode to switch nodes and create a new socket; alternatively remove the timer entirely if you don't need a watchdog. Ensure you update/clear reconnectTimerRef and use the same reconnection helper functions already in this file so behavior is consistent.backend/src/socket/pubsub.rs (1)
28-31: Use///for rustdoc instead of/** … */.While
/** */is technically accepted, the project's other modules (and rustfmt/clippy conventions) use///for outer doc comments. Switching keeps the docs consistent and avoids surprises withcargo docand IDE rendering.- /** - * Publishes a message to a regional Redis channel to sync horizontally scaled nodes. - * This ensures that players on different server instances stay in sync. - */ + /// Publishes a message to a regional Redis channel to sync horizontally scaled nodes. + /// + /// This ensures that players on different server instances stay in sync.(Same for
subscribe_to_room.)Also applies to: 41-44
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/src/socket/pubsub.rs` around lines 28 - 31, Replace the block-style doc comments (/** ... */) with Rust doc line comments (///) for the affected functions so they follow project conventions and render properly with cargo doc; specifically change the outer comments for publish_to_room and subscribe_to_room from block comments to triple-slash comments, keeping the same text and formatting/paragraph breaks but using /// lines so rustdoc and IDEs display them consistently.backend/src/socket/Cargo.toml (1)
23-23: Remove unusedjsonfeature fromredisdependency.The
pubsub.rsfile only usesAsyncCommands::publishwith a pre-serializedStringpayload (viaserde_json::to_string). No RedisJSON commands (JSON.SET,JSON.GET, etc.) are used, making thejsonfeature unnecessary and adding unneeded dependencies.-redis = { version = "0.25", features = ["tokio-comp", "json"] } +redis = { version = "0.25", features = ["tokio-comp"] }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/src/socket/Cargo.toml` at line 23, The redis crate dependency in Cargo.toml currently enables the unused "json" feature; since pubsub.rs only calls AsyncCommands::publish with a pre-serialized String (serde_json::to_string) and does not use RedisJSON commands, remove "json" from the features list for the redis dependency (leave the tokio-comp feature as-is) to avoid pulling unnecessary dependencies.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/src/socket/Cargo.toml`:
- Line 24: The dotenvy crate is added but never initialized, so env vars like
REDIS_URL (used in pubsub.rs) and BIND_ADDRESS (used in main.rs) in a .env file
are ignored; fix by calling dotenvy::dotenv().ok(); at the start of your
application entry (inside main() in main.rs) before any std::env::var reads so
those variables are loaded, or if you prefer to not use .env files, remove the
dotenvy dependency from Cargo.toml and rely only on environment
variables/hardcoded defaults.
In `@backend/src/socket/game.rs`:
- Around line 13-26: GAME_STATE.ServerState currently holds pubsub:
Option<RedisPubSub> but it’s never initialized or used; to fix, initialize a
RedisPubSub at startup (e.g. in main.rs behind env/feature flag), store it into
GAME_STATE.lock().pubsub, and in each broadcast path (functions send_move,
join_room, leave_room, accept_takeback, reject_takeback, offer_takeback) add a
call like state.pubsub.as_ref().map(|p| p.publish_move(room_id, &response))
after sending on the local broadcast::Sender; additionally spawn a background
subscriber task (per-room or a pattern subscriber like "game:*") that listens
for Redis messages and re-emits them onto the matching local broadcast::Sender
so remote moves are delivered to websocket clients.
In `@backend/src/socket/pubsub.rs`:
- Around line 45-51: subscribe_to_room currently creates and returns a
redis::aio::PubSub via get_async_pubsub()/subscribe but no caller consumes
messages, causing kernel buffer growth, and creating one connection per room
which won't scale; instead implement a single long‑lived subscriber task (e.g.
new start_global_subscription or start_pubsub_worker) that uses
psubscribe("game:*") once, spawns an async loop that calls
.on_message()/get_payload(), parses the room_id from the channel name (channel
format "game:{room_id}"), and republishes each message onto the corresponding
local broadcast::Sender for that room (look up or create the sender in your room
registry), and remove per-room get_async_pubsub usage from subscribe_to_room
(make it register/get a local Sender only); this ensures messages are drained
continuously and avoids one TCP connection per room.
- Around line 13-15: The REDIS_URL lazy_static captures the environment once and
won't pick up changes or dotenv-loaded vars; remove the lazy_static! REDIS_URL
and replace it with a small accessor function (e.g., fn redis_url() -> String)
that calls std::env::var("REDIS_URL").unwrap_or_else(||
"redis://127.0.0.1:6379".to_string()) each time it's needed; also ensure
dotenvy::dotenv().ok() is invoked early in program/test setup (e.g., in main or
test harness) so dotenv files are loaded before calling redis_url().
- Line 10: Replace the shared Arc<Mutex<redis::aio::Connection>> field with a
redis::aio::MultiplexedConnection to avoid serializing publishes: change the
struct field named connection to type MultiplexedConnection (remove Arc and
Mutex), obtain it via client.get_multiplexed_async_connection() where the
connection is created, update imports, and update all uses (e.g. any publish
calls that did self.connection.lock().await.publish(...)) to call
publish/execute directly on the cloned MultiplexedConnection (clone it where
concurrent use is needed) since it is Clone + Send + Sync and pipelines
internally. Ensure code compiles by removing async lock awaits and adjusting
types in function signatures that referenced Arc<Mutex<...>>.
- Around line 18-26: The error type returned by new uses Box<dyn
std::error::Error> which is not Send and will make futures !Send (breaking
tokio::spawn); update the error trait object to be sendable (e.g. Box<dyn
std::error::Error + Send + Sync + 'static>) in the new function signature and in
the other two related methods that return boxed errors so the returned Result
types are Send across async task boundaries (locate the new() function and any
publish/subscribe or similar methods that return Box<dyn std::error::Error> and
add the + Send + Sync + 'static bounds).
In `@frontend/context/webSocketScalingContext.tsx`:
- Around line 30-65: The sticky cache in gameNodeMap used by getOptimalNode
never evicts entries, causing stale/dead node reuse and unbounded growth; add an
eviction API (e.g., export invalidateNode(gameId) or releaseGame(gameId)) that
deletes gameNodeMap.current.delete(gameId) and optionally clears related state
(setCurrentNode if it matched) and call it from the WebSocket teardown path
(close-with-error/health-check failure) in the useChessSocket hook; also augment
getOptimalNode to honor a TTL per entry (store timestamp alongside
WebSocketNode) and to re-evaluate using reportLoad/setAvailableNodes when the
entry is expired or the node is flagged unhealthy so live telemetry can
influence sticky decisions.
- Around line 33-57: The docstring for getOptimalNode promises selection by
"regional proximity and server load" but the implementation only sorts mockNodes
by load; update getOptimalNode to factor user-region affinity into the scoring
(e.g., compute a score = weightLoad * load + weightRegion * distancePenalty or
regionAffinity and pick min score) or change the comment to remove "regional
proximity"; specifically modify getOptimalNode to compute region affinity from a
detected user region (or a passed-in preference) and combine it with load when
sorting the mockNodes, then keep the sticky session behavior using gameNodeMap
and still call setAvailableNodes with the node list.
- Around line 22-65: The DISCOVERY_ENDPOINT constant is unused and
getOptimalNode always returns hardcoded mockNodes (ws://localhost...), causing
production clients to get invalid localhost/insecure URLs; update getOptimalNode
to only use the hardcoded mockNodes when process.env.NODE_ENV !== 'production'
and in production perform a fetch to DISCOVERY_ENDPOINT to retrieve real node
data, convert/validate returned node URLs to wss:// if needed, then call
setAvailableNodes, update gameNodeMap.current and setCurrentNode with the
fetched/validated nodes, and keep the existing fallback to mockNodes only for
non-production or on fetch failure.
In `@frontend/hook/useChessSocket.ts`:
- Around line 84-94: The async createWebSocket has a race where multiple
in-flight getOptimalNode calls can overwrite wsRef.current and leave orphaned
sockets; fix by introducing and using a per-call token (use connectionTokenRef)
inside createWebSocket: capture a new token before awaiting getOptimalNode,
store it in a local variable, then after the await check that
connectionTokenRef.current still equals the local token before assigning
wsRef.current, adding event handlers, or scheduling reconnects; also ensure
disconnect (and the gameId effect cleanup) increments/changes
connectionTokenRef.current to invalidate any outstanding tokens so resolved
discoveries bail out and never wire listeners to stale sockets.
---
Outside diff comments:
In `@backend/src/socket/game.rs`:
- Around line 137-232: send_move currently only broadcasts locally and must also
publish the same ServerMessage to Redis via state.pubsub.publish_move without
awaiting while holding GAME_STATE's Mutex; fix by cloning the response
(response.clone()) and a handle to state.pubsub (e.g., let pubsub =
state.pubsub.clone()) before releasing the lock, drop the lock guard, then spawn
a tokio::task::spawn(async move { let _ = publish_move(pubsub,
response_clone).await; }) so you don't await inside send_move or hold the mutex
during the async publish; ensure any necessary Clone impls are used and apply
the same pattern to join_room, leave_room, and takeback paths.
In `@frontend/hook/useChessSocket.ts`:
- Around line 84-200: createWebSocket currently closes over getOptimalNode but
its useCallback dependency array only includes [gameId, calculateReconnectDelay,
status]; add getOptimalNode to that dependency array so createWebSocket is
recreated when getOptimalNode changes (update the useCallback dependencies for
the createWebSocket function to include getOptimalNode to satisfy hooks linting
and avoid stale closures).
---
Nitpick comments:
In `@backend/src/socket/Cargo.toml`:
- Line 23: The redis crate dependency in Cargo.toml currently enables the unused
"json" feature; since pubsub.rs only calls AsyncCommands::publish with a
pre-serialized String (serde_json::to_string) and does not use RedisJSON
commands, remove "json" from the features list for the redis dependency (leave
the tokio-comp feature as-is) to avoid pulling unnecessary dependencies.
In `@backend/src/socket/pubsub.rs`:
- Around line 28-31: Replace the block-style doc comments (/** ... */) with Rust
doc line comments (///) for the affected functions so they follow project
conventions and render properly with cargo doc; specifically change the outer
comments for publish_to_room and subscribe_to_room from block comments to
triple-slash comments, keeping the same text and formatting/paragraph breaks but
using /// lines so rustdoc and IDEs display them consistently.
In `@frontend/hook/useChessSocket.ts`:
- Around line 178-183: The reconnect watchdog set in reconnectTimerRef using
RECONNECT_TIMEOUT currently closes over a stale status and only logs; replace it
with actionable logic: when the timer fires check the latest connection state
via a mutable ref (e.g., add or use statusRef.current) rather than the
closed-over status, then either abort the current in-flight attempt (close the
socket) and call the existing reconnect routine (e.g., attemptReconnect or the
onopen/onclose reconnection flow) or explicitly call getOptimalNode to switch
nodes and create a new socket; alternatively remove the timer entirely if you
don't need a watchdog. Ensure you update/clear reconnectTimerRef and use the
same reconnection helper functions already in this file so behavior is
consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 427462d5-0217-4f79-93c5-8208dad0cbe0
📒 Files selected for processing (8)
.github/workflows/backend.ymlbackend/src/socket/Cargo.tomlbackend/src/socket/game.rsbackend/src/socket/lib.rsbackend/src/socket/pubsub.rsfrontend/app/layout.tsxfrontend/context/webSocketScalingContext.tsxfrontend/hook/useChessSocket.ts
| log = "0.4" | ||
| env_logger = "0.11" | ||
| redis = { version = "0.25", features = ["tokio-comp", "json"] } | ||
| dotenvy = "0.15" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm dotenvy is never invoked in the crate.
rg -nP '\bdotenvy\b|\bdotenv\s*\(' --type=rustRepository: NOVUS-X/XLMate
Length of output: 382
🏁 Script executed:
# First, find the exact location and contents of the socket module's Cargo.toml
find . -path "*/socket/Cargo.toml" -type f | head -5Repository: NOVUS-X/XLMate
Length of output: 90
🏁 Script executed:
# Check if dotenvy appears in socket module's Cargo.toml
find . -path "*/socket/Cargo.toml" -type f -exec grep -H "dotenvy" {} \;Repository: NOVUS-X/XLMate
Length of output: 107
🏁 Script executed:
# Look for main.rs and entrypoints in socket module
find . -path "*/socket/*" -name "main.rs" -o -path "*/socket/*" -name "lib.rs" | head -10Repository: NOVUS-X/XLMate
Length of output: 115
🏁 Script executed:
# Search specifically in socket module for any dotenv/dotenvy calls
find . -path "*/socket/*.rs" -type f -exec rg -l "dotenvy|dotenv" {} \;Repository: NOVUS-X/XLMate
Length of output: 40
🏁 Script executed:
# Read main.rs to see the actual code
wc -l ./backend/src/socket/main.rsRepository: NOVUS-X/XLMate
Length of output: 90
🏁 Script executed:
# Read the main.rs file
cat -n ./backend/src/socket/main.rsRepository: NOVUS-X/XLMate
Length of output: 1904
🏁 Script executed:
# Check if pubsub.rs exists and read it
find ./backend/src/socket -name "pubsub.rs" -type fRepository: NOVUS-X/XLMate
Length of output: 89
🏁 Script executed:
# Read pubsub.rs to check for env::var usage
wc -l ./backend/src/socket/pubsub.rsRepository: NOVUS-X/XLMate
Length of output: 92
🏁 Script executed:
# Read pubsub.rs
cat -n ./backend/src/socket/pubsub.rsRepository: NOVUS-X/XLMate
Length of output: 2278
🏁 Script executed:
# Check the full Cargo.toml for context
cat -n ./backend/src/socket/Cargo.tomlRepository: NOVUS-X/XLMate
Length of output: 775
dotenvy is added but never initialized — .env will not be loaded.
std::env::var("REDIS_URL") in pubsub.rs (line 14) and std::env::var("BIND_ADDRESS") in main.rs (line 16) are read directly without ever calling dotenvy::dotenv(). As a result, this dependency is dead code and operators who place REDIS_URL or BIND_ADDRESS in a .env file will silently fall back to the hardcoded defaults (redis://127.0.0.1:6379 and 127.0.0.1:8080). Either invoke dotenvy::dotenv().ok(); once at the top of main(), or drop the dependency.
🛠️ Suggested fix in `main.rs`
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ // Load environment variables from .env if present (no-op in production).
+ dotenvy::dotenv().ok();
+
// Initialize the logger
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/src/socket/Cargo.toml` at line 24, The dotenvy crate is added but
never initialized, so env vars like REDIS_URL (used in pubsub.rs) and
BIND_ADDRESS (used in main.rs) in a .env file are ignored; fix by calling
dotenvy::dotenv().ok(); at the start of your application entry (inside main() in
main.rs) before any std::env::var reads so those variables are loaded, or if you
prefer to not use .env files, remove the dotenvy dependency from Cargo.toml and
rely only on environment variables/hardcoded defaults.
| use crate::pubsub::RedisPubSub; | ||
|
|
||
| pub struct ServerState { | ||
| pub rooms: HashMap<String, Room>, | ||
| pub message_senders: HashMap<String, MessageSender>, | ||
| pub pubsub: Option<RedisPubSub>, | ||
| } | ||
|
|
||
| lazy_static::lazy_static! { | ||
| pub static ref GAME_STATE: Arc<Mutex<ServerState>> = Arc::new(Mutex::new(ServerState { | ||
| rooms: HashMap::new(), | ||
| message_senders: HashMap::new(), | ||
| pubsub: None, | ||
| })); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm RedisPubSub is never instantiated and publish_move is never called.
rg -nP '\bRedisPubSub::new\b|\.publish_move\s*\(|\.subscribe_to_room\s*\(' --type=rust
# Confirm no path assigns to ServerState.pubsub other than the None init.
rg -nP '\bpubsub\s*[:=]' --type=rust -C2Repository: NOVUS-X/XLMate
Length of output: 1761
🏁 Script executed:
# Check if RedisPubSub is used anywhere (not just instantiated)
rg -n '\bRedisPubSub\b' --type=rust
# Check if subscribe_to_room is called anywhere
rg -n 'subscribe_to_room' --type=rust
# Check for any mutations or assignments to state.pubsub
rg -n 'state\.pubsub|GAME_STATE.*pubsub' --type=rust
# Check the send_move function implementation
fd 'game\.rs$' --type f | head -1 | xargs wc -lRepository: NOVUS-X/XLMate
Length of output: 480
🏁 Script executed:
# Read the game.rs file to see send_move implementation
cat -n backend/src/socket/game.rs
# Search for any actual calls to subscribe_to_room
rg -n 'subscribe_to_room\s*\(' --type=rust
# Search for any code that might initialize pubsub or call RedisPubSub methods
rg -n 'publish_move|publish_' --type=rust
# Check main.rs for any initialization of RedisPubSub
fd 'main\.rs$' --type f | xargs cat -n 2>/dev/null | head -50Repository: NOVUS-X/XLMate
Length of output: 23728
Critical: pubsub is wired into ServerState but never initialized or used — horizontal sync is not actually happening.
GAME_STATE initializes pubsub: None and there is no code path anywhere in the crate that constructs RedisPubSub::new() and assigns it to state.pubsub. Combined with the fact that send_move (lines 227–229), join_room (lines 127–131), leave_room (lines 253–255), accept_takeback (lines 370–372), reject_takeback (lines 403–405), and offer_takeback (lines 314–316) only broadcast on the in-process broadcast::Sender and never call pubsub.publish_move(...), the entire pubsub module is dead code in this PR.
This contradicts the PR objective ("moves on one server instance are published to a distributed channel, keeping room state synchronized across physical nodes") — two server instances running this build will not see each other's moves. Before merging, please:
- Initialize
RedisPubSubat startup (e.g. inmain.rs, behind a feature flag or env check), and store it onGAME_STATE. - Call
state.pubsub.as_ref().map(|p| p.publish_move(room_id, &response))from the broadcasting paths insend_move/leave_room/join_room/accept_takeback/reject_takeback/offer_takeback. - Spawn a subscriber task per room (or a single pattern subscriber on
game:*) that re-emits incoming messages onto the localbroadcast::Senderso remote moves reach connected websockets.
Without (2) and (3) the new module/dependencies have no runtime effect.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/src/socket/game.rs` around lines 13 - 26, GAME_STATE.ServerState
currently holds pubsub: Option<RedisPubSub> but it’s never initialized or used;
to fix, initialize a RedisPubSub at startup (e.g. in main.rs behind env/feature
flag), store it into GAME_STATE.lock().pubsub, and in each broadcast path
(functions send_move, join_room, leave_room, accept_takeback, reject_takeback,
offer_takeback) add a call like state.pubsub.as_ref().map(|p|
p.publish_move(room_id, &response)) after sending on the local
broadcast::Sender; additionally spawn a background subscriber task (per-room or
a pattern subscriber like "game:*") that listens for Redis messages and re-emits
them onto the matching local broadcast::Sender so remote moves are delivered to
websocket clients.
| #[derive(Clone)] | ||
| pub struct RedisPubSub { | ||
| client: Client, | ||
| connection: Arc<Mutex<redis::aio::Connection>>, |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
redis-rs 0.25 MultiplexedConnection vs Connection concurrency and pipelining
💡 Result:
In redis-rs 0.25, MultiplexedConnection (from redis::aio) is the recommended async connection type, replacing the deprecated aio::Connection. It supports concurrency and pipelining on a single underlying TCP socket. MultiplexedConnection is cheaply cloneable and allows sending requests concurrently from multiple clones/tasks without waiting for responses (responses return in order). This enables high concurrency: clone the connection and use it across async tasks/threads for non-blocking commands. It natively supports pipelining via send_packed_commands (used internally by Pipeline::query_async). A bug affecting single-command pipelines on MultiplexedConnection was fixed in 0.25.2 (PR #1079). Regular Connection (sync) or deprecated async Connection lack multiplexing: they process one command at a time sequentially, requiring pools for concurrency. Key limitations: blocking commands (e.g., BLPOP) block the entire multiplexed connection; use separate connections/pools. Default response timeout is short (0.5s); adjust with set_response_timeout for longer blocks. Example usage: let client = redis::Client::open("redis://127.0.0.1/")?; let mut con = client.get_multiplexed_async_connection.await?; let _: = con.set("key", "value").await?; let val: String = con.get("key").await?; Clone con for concurrent use; Pipeline works via AsyncCommands.
Citations:
- 1: https://docs.rs/redis/latest/redis/aio/struct.MultiplexedConnection.html
- 2: https://github.com/redis-rs/redis-rs
- 3: https://docs.rs/cal-redis/latest/cal_redis/struct.MultiplexedConnection.html
- 4: What is the benefit of MultiplexedConnection? redis-rs/redis-rs#453
- 5: https://docs.rs/redis/latest/redis/struct.Pipeline.html
- 6: https://github.com/redis-rs/redis-rs/releases/tag/redis-0.25.0
- 7: https://docs.rs/redis/latest/src/redis/aio/multiplexed_connection.rs.html
- 8: Bug with multiplexed async connections in combination with pipeline (redis::pipe)? redis-rs/redis-rs#1078
- 9: https://docs.rs/redis/latest/redis/?search=Connection
🏁 Script executed:
fd -e rs -p "pubsub" | head -20Repository: NOVUS-X/XLMate
Length of output: 87
🏁 Script executed:
cat -n backend/src/socket/pubsub.rsRepository: NOVUS-X/XLMate
Length of output: 2278
Use MultiplexedConnection instead of Arc<Mutex<Connection>> to eliminate lock contention on publishes.
redis::aio::Connection is single-request-at-a-time, and wrapping it in Arc<Mutex<…>> serializes every publish call across all rooms through the same lock, creating a contention point on the hot path. MultiplexedConnection (returned by client.get_multiplexed_async_connection()) is Clone + Send + Sync and pipelines requests internally, allowing you to clone the connection for concurrent use without the Arc<Mutex> overhead.
♻️ Proposed refactor
-use redis::{AsyncCommands, Client};
-use std::sync::Arc;
-use tokio::sync::Mutex;
+use redis::{aio::MultiplexedConnection, AsyncCommands, Client};
@@
#[derive(Clone)]
pub struct RedisPubSub {
client: Client,
- connection: Arc<Mutex<redis::aio::Connection>>,
+ connection: MultiplexedConnection,
}
@@
- pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
+ pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
let client = Client::open(REDIS_URL.as_str())?;
- let connection = client.get_async_connection().await?;
+ let connection = client.get_multiplexed_async_connection().await?;
- Ok(Self {
- client,
- connection: Arc::new(Mutex::new(connection)),
- })
+ Ok(Self { client, connection })
}
@@
- pub async fn publish_move(&self, room_id: &str, message: &ServerMessage) -> Result<(), Box<dyn std::error::Error>> {
+ pub async fn publish_move(&self, room_id: &str, message: &ServerMessage) -> Result<(), Box<dyn std::error::Error>> {
let payload = serde_json::to_string(message)?;
- let mut conn = self.connection.lock().await;
+ let mut conn = self.connection.clone();
let _: () = conn.publish(format!("game:{}", room_id), payload).await?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| connection: Arc<Mutex<redis::aio::Connection>>, | |
| connection: MultiplexedConnection, |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/src/socket/pubsub.rs` at line 10, Replace the shared
Arc<Mutex<redis::aio::Connection>> field with a
redis::aio::MultiplexedConnection to avoid serializing publishes: change the
struct field named connection to type MultiplexedConnection (remove Arc and
Mutex), obtain it via client.get_multiplexed_async_connection() where the
connection is created, update imports, and update all uses (e.g. any publish
calls that did self.connection.lock().await.publish(...)) to call
publish/execute directly on the cloned MultiplexedConnection (clone it where
concurrent use is needed) since it is Clone + Send + Sync and pipelines
internally. Ensure code compiles by removing async lock awaits and adjusting
types in function signatures that referenced Arc<Mutex<...>>.
| lazy_static! { | ||
| static ref REDIS_URL: String = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); | ||
| } |
There was a problem hiding this comment.
REDIS_URL is captured once at first access and never re-resolves.
Combined with the missing dotenvy::dotenv() call (see Cargo.toml comment), tests or processes that set REDIS_URL after this lazy_static is first touched will silently use the default. Lower priority since env vars are typically set before main() runs, but worth being aware of when adding integration tests that set it programmatically.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/src/socket/pubsub.rs` around lines 13 - 15, The REDIS_URL lazy_static
captures the environment once and won't pick up changes or dotenv-loaded vars;
remove the lazy_static! REDIS_URL and replace it with a small accessor function
(e.g., fn redis_url() -> String) that calls
std::env::var("REDIS_URL").unwrap_or_else(||
"redis://127.0.0.1:6379".to_string()) each time it's needed; also ensure
dotenvy::dotenv().ok() is invoked early in program/test setup (e.g., in main or
test harness) so dotenv files are loaded before calling redis_url().
| pub async fn new() -> Result<Self, Box<dyn std::error::Error>> { | ||
| let client = Client::open(REDIS_URL.as_str())?; | ||
| let connection = client.get_async_connection().await?; | ||
|
|
||
| Ok(Self { | ||
| client, | ||
| connection: Arc::new(Mutex::new(connection)), | ||
| }) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In Rust, why is Boxproblematic fortokio::spawn, and what is the recommended bound?
💡 Result:
Box is problematic for tokio::spawn because dyn std::error::Error does not implement Send. The tokio::spawn function requires the future passed to it to implement Future + Send + 'static, with its Output also being Send + 'static. When an async function returns Result<T, Box>, the generated future captures this return type. Since dyn Error lacks Send, the future cannot be Send, causing a compiler error like "(dyn std::error::Error + 'static) cannot be sent between threads safely". The recommended bound is Box<dyn std::error::Error + Send>. This ensures the trait object implements Send, making the future Send and compatible with tokio::spawn. Some sources also suggest + Send + Sync for additional safety (e.g., if the error needs to be shared across threads), but + Send is the minimal requirement for spawning.
Citations:
- 1: https://stackoverflow.com/questions/76772782/how-do-i-fix-cannot-be-sent-between-threads-safely-when-using-tokiospawn-and
- 2: https://docs.rs/tokio/*/tokio/task/fn.spawn.html
- 3: https://docs.rs/tokio/1.47.1/src/tokio/task/spawn.rs.html
🏁 Script executed:
find . -name "pubsub.rs" -type fRepository: NOVUS-X/XLMate
Length of output: 89
🏁 Script executed:
find . -name "game.rs" -type fRepository: NOVUS-X/XLMate
Length of output: 236
🏁 Script executed:
wc -l ./backend/src/socket/pubsub.rsRepository: NOVUS-X/XLMate
Length of output: 92
🏁 Script executed:
cat -n ./backend/src/socket/pubsub.rs | head -60Repository: NOVUS-X/XLMate
Length of output: 2278
🏁 Script executed:
cat -n ./backend/src/socket/game.rs | head -100Repository: NOVUS-X/XLMate
Length of output: 3854
🏁 Script executed:
rg "publish_move|subscribe_to_room" ./backend/src/socket/game.rs -A 3 -B 3Repository: NOVUS-X/XLMate
Length of output: 40
🏁 Script executed:
rg "publish_move|subscribe_to_room" ./backend/ -r --type rust -A 2Repository: NOVUS-X/XLMate
Length of output: 765
🏁 Script executed:
rg "publish_move|subscribe_to_room" ./backend/ --type-list | head -20Repository: NOVUS-X/XLMate
Length of output: 448
🏁 Script executed:
rg "publish_move|subscribe_to_room" ./backend/Repository: NOVUS-X/XLMate
Length of output: 358
🏁 Script executed:
rg "tokio::spawn|tokio::task::spawn" ./backend/ -A 5Repository: NOVUS-X/XLMate
Length of output: 1561
Box<dyn std::error::Error> is not Send — will block use from tokio::spawn.
The trait object must be Send to cross async task boundaries (required by tokio::spawn). Without it, the future becomes !Send, causing compile errors when awaited in spawned tasks. Add the bound to all three method signatures.
🛠️ Suggested fix
- pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
+ pub async fn new() -> Result<Self, Box<dyn std::error::Error + Send>> {
@@
- pub async fn publish_move(&self, room_id: &str, message: &ServerMessage) -> Result<(), Box<dyn std::error::Error>> {
+ pub async fn publish_move(&self, room_id: &str, message: &ServerMessage) -> Result<(), Box<dyn std::error::Error + Send>> {
@@
- pub async fn subscribe_to_room(&self, room_id: &str) -> Result<redis::aio::PubSub, Box<dyn std::error::Error>> {
+ pub async fn subscribe_to_room(&self, room_id: &str) -> Result<redis::aio::PubSub, Box<dyn std::error::Error + Send>> {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/src/socket/pubsub.rs` around lines 18 - 26, The error type returned
by new uses Box<dyn std::error::Error> which is not Send and will make futures
!Send (breaking tokio::spawn); update the error trait object to be sendable
(e.g. Box<dyn std::error::Error + Send + Sync + 'static>) in the new function
signature and in the other two related methods that return boxed errors so the
returned Result types are Send across async task boundaries (locate the new()
function and any publish/subscribe or similar methods that return Box<dyn
std::error::Error> and add the + Send + Sync + 'static bounds).
| pub async fn subscribe_to_room(&self, room_id: &str) -> Result<redis::aio::PubSub, Box<dyn std::error::Error>> { | ||
| let mut pubsub = self.client.get_async_pubsub().await?; | ||
| pubsub.subscribe(format!("game:{}", room_id)).await?; | ||
|
|
||
| log::info!("[Redis] Subscribed to horizontal updates for room {}", room_id); | ||
| Ok(pubsub) | ||
| } |
There was a problem hiding this comment.
subscribe_to_room returns a PubSub but no caller consumes it — and per-room subscriptions don't scale.
Two concerns to consider before this is wired up:
- No consumer loop. Returning the
PubSubto a caller that never reads.on_message()means subscribed messages are queued in the kernel buffer and eventually dropped. The module needs a long-lived task that drains the stream and re-publishes onto the localbroadcast::Senderfor that room. - Per-room connections won't scale. Each call to
subscribe_to_roomopens a new TCP connection to Redis. With many concurrent rooms this exhausts file descriptors and Redis client slots. Prefer a singlepsubscribe("game:*")connection at startup, parse theroom_idout of the channel name, and dispatch to the right local sender.
Worth addressing as part of the integration step (which is currently missing — see comment on game.rs).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/src/socket/pubsub.rs` around lines 45 - 51, subscribe_to_room
currently creates and returns a redis::aio::PubSub via
get_async_pubsub()/subscribe but no caller consumes messages, causing kernel
buffer growth, and creating one connection per room which won't scale; instead
implement a single long‑lived subscriber task (e.g. new
start_global_subscription or start_pubsub_worker) that uses psubscribe("game:*")
once, spawns an async loop that calls .on_message()/get_payload(), parses the
room_id from the channel name (channel format "game:{room_id}"), and republishes
each message onto the corresponding local broadcast::Sender for that room (look
up or create the sender in your room registry), and remove per-room
get_async_pubsub usage from subscribe_to_room (make it register/get a local
Sender only); this ensures messages are drained continuously and avoids one TCP
connection per room.
| // Mock discovery service URL - in production this would be a real endpoint | ||
| const DISCOVERY_ENDPOINT = process.env.NEXT_PUBLIC_DISCOVERY_URL || "http://localhost:8000/v1/nodes/discovery"; | ||
|
|
||
| export function WebSocketScalingProvider({ children }: { children: React.ReactNode }) { | ||
| const [currentNode, setCurrentNode] = useState<WebSocketNode | null>(null); | ||
| const [availableNodes, setAvailableNodes] = useState<WebSocketNode[]>([]); | ||
| const [isScaling, setIsScaling] = useState(false); | ||
|
|
||
| // Cache for game-to-node mapping to ensure session stickiness within horizontal scaling | ||
| const gameNodeMap = useRef<Map<string, WebSocketNode>>(new Map()); | ||
|
|
||
| /** | ||
| * Fetches the most optimal WebSocket node based on regional proximity and server load. | ||
| * Implements horizontal load balancing logic on the client side. | ||
| */ | ||
| const getOptimalNode = useCallback(async (gameId: string): Promise<WebSocketNode> => { | ||
| // Check if we already have a sticky session for this game | ||
| if (gameNodeMap.current.has(gameId)) { | ||
| return gameNodeMap.current.get(gameId)!; | ||
| } | ||
|
|
||
| setIsScaling(true); | ||
| try { | ||
| // In a real implementation, we would fetch from DISCOVERY_ENDPOINT | ||
| // For now, we simulate a discovery process that returns available horizontal nodes | ||
| const mockNodes: WebSocketNode[] = [ | ||
| { id: "node-us-east-1", url: "ws://localhost:8080", region: "us-east", load: 0.2 }, | ||
| { id: "node-eu-west-1", url: "ws://localhost:8081", region: "eu-west", load: 0.4 }, | ||
| { id: "node-ap-south-1", url: "ws://localhost:8082", region: "ap-south", load: 0.1 }, | ||
| ]; | ||
|
|
||
| setAvailableNodes(mockNodes); | ||
|
|
||
| // Sorting logic: prioritize lowest load | ||
| const sortedNodes = [...mockNodes].sort((a, b) => a.load - b.load); | ||
| const optimal = sortedNodes[0]; | ||
|
|
||
| gameNodeMap.current.set(gameId, optimal); | ||
| setCurrentNode(optimal); | ||
| return optimal; | ||
| } finally { | ||
| setIsScaling(false); | ||
| } | ||
| }, []); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm DISCOVERY_ENDPOINT / NEXT_PUBLIC_DISCOVERY_URL really has no other consumer.
rg -nP -C2 'DISCOVERY_ENDPOINT|NEXT_PUBLIC_DISCOVERY_URL'Repository: NOVUS-X/XLMate
Length of output: 1060
Critical: DISCOVERY_ENDPOINT is unused dead code; production builds will connect to hardcoded localhost WebSocket URLs.
DISCOVERY_ENDPOINT is declared on line 23 but never used — it only appears in comments (lines 22, 45). The discovery flow at lines 47–51 always returns hardcoded ws://localhost:808x mock nodes regardless of environment. In production, this means every client receives localhost URLs that (a) cannot resolve outside the local machine, (b) will be blocked as mixed content from HTTPS origins, and (c) use insecure ws:// instead of wss://. This breaks the PR's stated regional discovery and horizontal scaling objective.
Wrap the mock behind process.env.NODE_ENV !== 'production', and fetch from DISCOVERY_ENDPOINT in production with wss:// URLs.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/context/webSocketScalingContext.tsx` around lines 22 - 65, The
DISCOVERY_ENDPOINT constant is unused and getOptimalNode always returns
hardcoded mockNodes (ws://localhost...), causing production clients to get
invalid localhost/insecure URLs; update getOptimalNode to only use the hardcoded
mockNodes when process.env.NODE_ENV !== 'production' and in production perform a
fetch to DISCOVERY_ENDPOINT to retrieve real node data, convert/validate
returned node URLs to wss:// if needed, then call setAvailableNodes, update
gameNodeMap.current and setCurrentNode with the fetched/validated nodes, and
keep the existing fallback to mockNodes only for non-production or on fetch
failure.
| // Cache for game-to-node mapping to ensure session stickiness within horizontal scaling | ||
| const gameNodeMap = useRef<Map<string, WebSocketNode>>(new Map()); | ||
|
|
||
| /** | ||
| * Fetches the most optimal WebSocket node based on regional proximity and server load. | ||
| * Implements horizontal load balancing logic on the client side. | ||
| */ | ||
| const getOptimalNode = useCallback(async (gameId: string): Promise<WebSocketNode> => { | ||
| // Check if we already have a sticky session for this game | ||
| if (gameNodeMap.current.has(gameId)) { | ||
| return gameNodeMap.current.get(gameId)!; | ||
| } | ||
|
|
||
| setIsScaling(true); | ||
| try { | ||
| // In a real implementation, we would fetch from DISCOVERY_ENDPOINT | ||
| // For now, we simulate a discovery process that returns available horizontal nodes | ||
| const mockNodes: WebSocketNode[] = [ | ||
| { id: "node-us-east-1", url: "ws://localhost:8080", region: "us-east", load: 0.2 }, | ||
| { id: "node-eu-west-1", url: "ws://localhost:8081", region: "eu-west", load: 0.4 }, | ||
| { id: "node-ap-south-1", url: "ws://localhost:8082", region: "ap-south", load: 0.1 }, | ||
| ]; | ||
|
|
||
| setAvailableNodes(mockNodes); | ||
|
|
||
| // Sorting logic: prioritize lowest load | ||
| const sortedNodes = [...mockNodes].sort((a, b) => a.load - b.load); | ||
| const optimal = sortedNodes[0]; | ||
|
|
||
| gameNodeMap.current.set(gameId, optimal); | ||
| setCurrentNode(optimal); | ||
| return optimal; | ||
| } finally { | ||
| setIsScaling(false); | ||
| } | ||
| }, []); |
There was a problem hiding this comment.
Sticky session cache has no invalidation — failover is silently broken.
gameNodeMap.current.set(gameId, optimal) (line 59) is never cleared. Once a gameId is bound to a node:
- If that node becomes unhealthy or its load spikes after binding, every subsequent reconnect via
useChessSocket(which callsgetOptimalNode(gameId)) keeps returning the same dead node — defeating the PR's stated failover goal. - The map grows for the SPA's lifetime (one entry per game ever observed), leaking memory in long sessions / matchmaking lobbies.
reportLoadupdatesavailableNodesbut cannot influence sticky decisions, so live load telemetry has no effect.
Add an eviction path: on WebSocket close-with-error / health-check failure, expose an invalidateNode(gameId) (or releaseGame(gameId)) the hook can call to drop the binding before the next getOptimalNode call. Consider a TTL too.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/context/webSocketScalingContext.tsx` around lines 30 - 65, The
sticky cache in gameNodeMap used by getOptimalNode never evicts entries, causing
stale/dead node reuse and unbounded growth; add an eviction API (e.g., export
invalidateNode(gameId) or releaseGame(gameId)) that deletes
gameNodeMap.current.delete(gameId) and optionally clears related state
(setCurrentNode if it matched) and call it from the WebSocket teardown path
(close-with-error/health-check failure) in the useChessSocket hook; also augment
getOptimalNode to honor a TTL per entry (store timestamp alongside
WebSocketNode) and to re-evaluate using reportLoad/setAvailableNodes when the
entry is expired or the node is flagged unhealthy so live telemetry can
influence sticky decisions.
| /** | ||
| * Fetches the most optimal WebSocket node based on regional proximity and server load. | ||
| * Implements horizontal load balancing logic on the client side. | ||
| */ | ||
| const getOptimalNode = useCallback(async (gameId: string): Promise<WebSocketNode> => { | ||
| // Check if we already have a sticky session for this game | ||
| if (gameNodeMap.current.has(gameId)) { | ||
| return gameNodeMap.current.get(gameId)!; | ||
| } | ||
|
|
||
| setIsScaling(true); | ||
| try { | ||
| // In a real implementation, we would fetch from DISCOVERY_ENDPOINT | ||
| // For now, we simulate a discovery process that returns available horizontal nodes | ||
| const mockNodes: WebSocketNode[] = [ | ||
| { id: "node-us-east-1", url: "ws://localhost:8080", region: "us-east", load: 0.2 }, | ||
| { id: "node-eu-west-1", url: "ws://localhost:8081", region: "eu-west", load: 0.4 }, | ||
| { id: "node-ap-south-1", url: "ws://localhost:8082", region: "ap-south", load: 0.1 }, | ||
| ]; | ||
|
|
||
| setAvailableNodes(mockNodes); | ||
|
|
||
| // Sorting logic: prioritize lowest load | ||
| const sortedNodes = [...mockNodes].sort((a, b) => a.load - b.load); | ||
| const optimal = sortedNodes[0]; |
There was a problem hiding this comment.
Docstring claims "regional proximity" but implementation only sorts by load.
Lines 33–35 describe selection "based on regional proximity and server load", but the only criterion at line 56 is a.load - b.load. Either drop the proximity claim from the docstring or factor proximity into the score (e.g., a precomputed region-affinity weight from the user's locale / detected region).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/context/webSocketScalingContext.tsx` around lines 33 - 57, The
docstring for getOptimalNode promises selection by "regional proximity and
server load" but the implementation only sorts mockNodes by load; update
getOptimalNode to factor user-region affinity into the scoring (e.g., compute a
score = weightLoad * load + weightRegion * distancePenalty or regionAffinity and
pick min score) or change the comment to remove "regional proximity";
specifically modify getOptimalNode to compute region affinity from a detected
user region (or a passed-in preference) and combine it with load when sorting
the mockNodes, then keep the sticky session behavior using gameNodeMap and still
call setAvailableNodes with the node list.
| const createWebSocket = useCallback(async (attemptReconnect = false) => { | ||
| if (!gameId) return null; | ||
|
|
||
| try { | ||
| const ws = new WebSocket(`${WS_BASE}/v1/games/${gameId}/ws`); | ||
| // Step 1: Discover the optimal horizontal node (Regional Load Balancing) | ||
| const node = await getOptimalNode(gameId); | ||
| const wsUrl = `${node.url}/v1/games/${gameId}/ws`; | ||
|
|
||
| const ws = new WebSocket(wsUrl); | ||
| wsRef.current = ws; | ||
|
|
There was a problem hiding this comment.
Async race: orphaned WebSockets when gameId changes during discovery.
createWebSocket is now async and assigns wsRef.current = ws synchronously after await getOptimalNode(...) resolves (lines 89–93). If the gameId effect (lines 314–336) re-runs while a prior getOptimalNode is still in-flight, the second call can resolve before/after the first and overwrite wsRef.current. The active flag at line 318 only closes the currently resolving ws, but by then wsRef.current may already point at a different, in‑flight connection (or the cleanup at line 327 will close the wrong one). Net effect: leaked/stale WebSocket instances whose onmessage/onclose handlers still mutate this hook's state (e.g., setting lastOpponentMove, scheduling reconnects).
Recommend tracking the in-flight token explicitly and bailing out before assigning wsRef.current / wiring listeners.
♻️ Suggested guard pattern (illustrative)
- const createWebSocket = useCallback(async (attemptReconnect = false) => {
+ const connectionTokenRef = useRef(0);
+ const createWebSocket = useCallback(async (attemptReconnect = false) => {
if (!gameId) return null;
+ const token = ++connectionTokenRef.current;
try {
const node = await getOptimalNode(gameId);
+ if (token !== connectionTokenRef.current) return null;
const wsUrl = `${node.url}/v1/games/${gameId}/ws`;
const ws = new WebSocket(wsUrl);
+ if (token !== connectionTokenRef.current) {
+ ws.close();
+ return null;
+ }
wsRef.current = ws;…and bump connectionTokenRef.current in disconnect / the gameId-effect cleanup so any pending discovery resolves into a no‑op.
Also applies to: 314-331
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/hook/useChessSocket.ts` around lines 84 - 94, The async
createWebSocket has a race where multiple in-flight getOptimalNode calls can
overwrite wsRef.current and leave orphaned sockets; fix by introducing and using
a per-call token (use connectionTokenRef) inside createWebSocket: capture a new
token before awaiting getOptimalNode, store it in a local variable, then after
the await check that connectionTokenRef.current still equals the local token
before assigning wsRef.current, adding event handlers, or scheduling reconnects;
also ensure disconnect (and the gameId effect cleanup) increments/changes
connectionTokenRef.current to invalidate any outstanding tokens so resolved
discoveries bail out and never wire listeners to stale sockets.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
backend/modules/chess/src/rating.rs (1)
66-67: Optional: dropmap_errand rely onFrom<DbErr> for ApiError.Since
error.rsalready providesimpl From<DbErr> for ApiError(lines 28–31), each of these.map_err(ApiError::DatabaseError)?sites can be reduced to a bare?, removing redundant boilerplate.♻️ Example simplification
- let txn = db.begin().await - .map_err(ApiError::DatabaseError)?; + let txn = db.begin().await?;- let game_model = game::Entity::find_by_id(game_id) - .one(txn) - .await - .map_err(ApiError::DatabaseError)? - .ok_or_else(|| ApiError::NotFound("Game not found".to_string()))?; + let game_model = game::Entity::find_by_id(game_id) + .one(txn) + .await? + .ok_or_else(|| ApiError::NotFound("Game not found".to_string()))?;The same applies to the
commit, player fetches,update, andget_player_rating/set_player_ratingcall sites.Also applies to: 74-75, 93-96, 118-121, 124-127, 152-153, 155-156, 216-219, 240-241
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/modules/chess/src/rating.rs` around lines 66 - 67, Remove the redundant .map_err(ApiError::DatabaseError)? wrappers where DbErr is already convertible to ApiError via From; replace those expressions with the simpler bare ? so the compiler uses the From<DbErr> for ApiError conversion. Specifically, update calls like db.begin().await.map_err(ApiError::DatabaseError)? and txn.commit().await.map_err(ApiError::DatabaseError)? as well as database fetch/update sites and calls to get_player_rating / set_player_rating and any player fetches or update operations in rating.rs to use the trailing ? instead of .map_err(ApiError::DatabaseError)? so error conversion is handled by the existing From impl.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@backend/modules/chess/src/rating.rs`:
- Around line 66-67: Remove the redundant .map_err(ApiError::DatabaseError)?
wrappers where DbErr is already convertible to ApiError via From; replace those
expressions with the simpler bare ? so the compiler uses the From<DbErr> for
ApiError conversion. Specifically, update calls like
db.begin().await.map_err(ApiError::DatabaseError)? and
txn.commit().await.map_err(ApiError::DatabaseError)? as well as database
fetch/update sites and calls to get_player_rating / set_player_rating and any
player fetches or update operations in rating.rs to use the trailing ? instead
of .map_err(ApiError::DatabaseError)? so error conversion is handled by the
existing From impl.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: dd11cd23-e4f5-4464-b2ed-3141b93bdfe2
📒 Files selected for processing (1)
backend/modules/chess/src/rating.rs
Resolves #568
Context
To support a growing user base and ensure premium Web3 interactions, the XLMate platform required a shift from static WebSocket connections to a horizontally scalable architecture. This PR introduces regional node balancing and distributed state synchronization.
Changes proposed in this pull request:
WebSocketScalingContextto manage dynamic node discovery. Clients now query for the optimal regional server node based on load and proximity before connecting.useChessSockethook to leverage the discovery layer, enabling seamless failover and session stickiness across horizontally scaled nodes.Acceptance Criteria Met:
Closes #568
Summary by CodeRabbit
Release Notes
New Features
Infrastructure