From 85668271ed46a43b07ce54ab77dac44bdbbb3d79 Mon Sep 17 00:00:00 2001 From: Prometheus1400 Date: Sun, 7 Dec 2025 18:40:19 -0800 Subject: [PATCH] clients share id --- Cargo.lock | 76 ++++++++++++++++++++++++++ Cargo.toml | 1 + cli/Cargo.toml | 1 + cli/src/app.rs | 8 ++- cli/src/args.rs | 53 ------------------ cli/src/main.rs | 4 +- cli/src/ui/fuzzy_selector_widget.rs | 6 +- core/Cargo.toml | 1 + core/src/comm.rs | 6 +- core/src/messages/request.rs | 2 + core/src/messages/traits.rs | 2 - daemon/Cargo.toml | 1 + daemon/src/actors/client_connection.rs | 12 ++-- daemon/src/actors/session_manager.rs | 41 +++++++------- daemon/src/daemon.rs | 4 +- 15 files changed, 125 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a72ff1..c167887 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,6 +145,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bumpalo" +version = "3.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" + [[package]] name = "bytes" version = "1.11.0" @@ -675,6 +681,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "js-sys" +version = "0.3.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -1094,6 +1110,7 @@ dependencies = [ "tracing-error", "tracing-subscriber", "tui-term", + "uuid", "vt100", ] @@ -1110,6 +1127,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tokio", + "uuid", ] [[package]] @@ -1131,6 +1149,7 @@ dependencies = [ "tracing", "tracing-error", "tracing-subscriber", + "uuid", "vt100", ] @@ -1635,6 +1654,18 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" +dependencies = [ + "getrandom", + "js-sys", + "serde_core", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.1" @@ -1695,6 +1726,51 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn 2.0.111", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4" +dependencies = [ + "unicode-ident", +] + [[package]] name = "which" version = "8.0.0" diff --git a/Cargo.toml b/Cargo.toml index 5da7e93..4f7fe25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ serde_json = { version = "1.0.145" } tokio = { version = "1.48", features = ["full"] } tracing = { version = "0.1" } tracing-appender = { version = "0.2" } +uuid = { version = "1.0", features = ["v4", "serde"] } tracing-error = { version = "0.2" } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } vt100 = { version = "0" } diff --git a/cli/Cargo.toml b/cli/Cargo.toml index ae15eca..3c3df21 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -16,6 +16,7 @@ serde.workspace = true tokio.workspace = true tracing-appender.workspace = true tracing-error.workspace = true +uuid.workspace = true tracing-subscriber.workspace = true tracing.workspace = true vt100.workspace = true diff --git a/cli/src/app.rs b/cli/src/app.rs index 06fb16d..3be0ec9 100644 --- a/cli/src/app.rs +++ b/cli/src/app.rs @@ -1,7 +1,6 @@ use std::{fmt::Debug, io::Stdout, time::Duration}; use bytes::Bytes; -use clap::FromArgMatches; use color_eyre::eyre; use derivative::Derivative; use ratatui::{Terminal, prelude::CrosstermBackend, restore, widgets::ListState}; @@ -16,6 +15,7 @@ use tokio::{ sync::{broadcast, mpsc}, time::interval, }; +use uuid::Uuid; use vt100::Parser; use crate::{ @@ -95,11 +95,13 @@ pub struct App { input_parser: InputParser, stream: UnixStream, bg_tasks: Vec, + id: Uuid, } impl App { - pub fn new(stream: UnixStream, daemon_state: DaemonState) -> Self { + pub fn new(id: Uuid, stream: UnixStream, daemon_state: DaemonState) -> Self { Self { + id, stream, input_parser: InputParser::default(), state: AppState { @@ -125,7 +127,7 @@ impl App { } } - #[instrument(parent=None, skip(self), name="App")] + #[instrument(parent=None, skip(self), fields(id=?self.id), name="App")] pub async fn run(&mut self) -> Result<()> { let mut term = ratatui::init(); debug!("Starting app"); diff --git a/cli/src/args.rs b/cli/src/args.rs index 3dc5383..fcdeb03 100644 --- a/cli/src/args.rs +++ b/cli/src/args.rs @@ -1,5 +1,4 @@ use clap::{Parser, Subcommand}; -use remux_core::messages::{CliRequestMessage, RequestBody, RequestBuilder, request}; #[derive(Parser, Debug)] pub struct Args { @@ -23,55 +22,3 @@ pub enum Commands { pub enum SessionCommands { List, } - -impl Commands { - pub fn into_request(self) -> CliRequestMessage { - match self { - Self::Attach { session_id } => RequestBuilder::default() - .body(request::Attach { - session_id, - create: true, - }) - .build(), - Self::Session { .. } => todo!(), - } - } -} - -// #[allow(clippy::from_over_into)] -// impl Into> for Commands { -// fn into(self) -> RequestMessage { -// match self { -// Self::Attach { session_id } => RequestBuilder::default() -// .body(request::Attach { -// session_id, -// create: true, -// }) -// .build(), -// Self::Session { action } => action.into(), -// } -// } -// } - -// #[allow(clippy::from_over_into)] -// impl Into for SessionCommands { -// fn into(self) -> RequestBody { -// match self { -// SessionCommands::List => RequestBody::SessionsList, -// } -// } -// } -// #[allow(clippy::from_over_into)] -// impl Into for Commands { -// fn into(self) -> RequestBody { -// let body: RequestBody = self.into(); -// RequestBuilder::default().body(body).build() -// } -// } -// #[allow(clippy::from_over_into)] -// impl Into for SessionCommands { -// fn into(self) -> RequestBody { -// let body: RequestBody = self.into(); -// RequestBuilder::default().body(body).build() -// } -// } diff --git a/cli/src/main.rs b/cli/src/main.rs index d5c5d9f..bb5b958 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -19,6 +19,7 @@ use remux_core::{ }, }; use tokio::net::UnixStream; +use uuid::Uuid; use crate::{ app::App, @@ -95,6 +96,7 @@ async fn run(command: Commands) -> Result<()> { stream, RequestBuilder::default() .body(request::Attach { + id: Uuid::new_v4(), session_id, create: true, }) @@ -114,7 +116,7 @@ async fn attach(mut stream: UnixStream, attach_request: CliRequestMessage(stream: &mut UnixStream, req: &CliRequestM where B: RequestBody + Serialize + for<'de> Deserialize<'de>, { - let req_id = req.id; + // let req_id = req.id; send_message(stream, req).await?; let res: ResponseMessage = read_message(stream).await?; - let res_id = res.id; + // let res_id = res.id; // if req_id != res_id { // return Err(Error::Response(ResponseError::UnexpectedId { expected: req_id, actual: res_id })); // } @@ -80,6 +80,7 @@ mod test { use std::{fs::remove_file, path::PathBuf}; use tokio::net::UnixListener; + use uuid::Uuid; use super::*; use crate::{ @@ -104,6 +105,7 @@ mod test { let addr = listener.local_addr()?; let attach = request::Attach { + id: Uuid::new_v4(), session_id: 1, create: true, }; diff --git a/core/src/messages/request.rs b/core/src/messages/request.rs index a4380b1..d0a7696 100644 --- a/core/src/messages/request.rs +++ b/core/src/messages/request.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use uuid::Uuid; use crate::{ messages::{ @@ -35,6 +36,7 @@ impl Message for DaemonRequestMessage {} #[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] pub struct Attach { + pub id: Uuid, pub session_id: u32, pub create: bool, } diff --git a/core/src/messages/traits.rs b/core/src/messages/traits.rs index b0f8fd6..bf0e2bf 100644 --- a/core/src/messages/traits.rs +++ b/core/src/messages/traits.rs @@ -1,5 +1,3 @@ -use std::fmt::Debug; - use serde::{Deserialize, Serialize, de::DeserializeOwned}; pub trait Message: Serialize + DeserializeOwned + for<'de> Deserialize<'de> {} diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index 9bc7e61..4b54d63 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -21,6 +21,7 @@ crossterm.workspace = true derive_more.workspace = true itertools.workspace = true serde.workspace = true +uuid.workspace = true tokio.workspace = true tracing-error.workspace = true tracing-subscriber.workspace = true diff --git a/daemon/src/actors/client_connection.rs b/daemon/src/actors/client_connection.rs index 258f011..39e2eb6 100644 --- a/daemon/src/actors/client_connection.rs +++ b/daemon/src/actors/client_connection.rs @@ -1,5 +1,4 @@ use bytes::Bytes; -use derive_more::Display; use handle_macro::Handle; use remux_core::{ comm, @@ -8,6 +7,7 @@ use remux_core::{ states::DaemonState, }; use tokio::{net::UnixStream, sync::mpsc}; +use uuid::Uuid; use crate::{actors::session_manager::SessionManagerHandle, layout::SplitDirection, prelude::*}; @@ -39,7 +39,7 @@ enum ClientConnectionState { } pub struct ClientConnection { - id: u32, + id: Uuid, stream: UnixStream, handle: ClientConnectionHandle, rx: mpsc::Receiver, @@ -48,17 +48,17 @@ pub struct ClientConnection { } impl ClientConnection { pub fn spawn( + id: Uuid, stream: UnixStream, session_manager_handle: SessionManagerHandle, connecting_session_id: u32, ) -> Result { - let client = Self::new(stream, session_manager_handle); + let client = Self::new(id, stream, session_manager_handle); client.run(connecting_session_id) } - fn new(stream: UnixStream, session_manager_handle: SessionManagerHandle) -> Self { + fn new(id: Uuid, stream: UnixStream, session_manager_handle: SessionManagerHandle) -> Self { let (tx, rx) = mpsc::channel(10); let handle = ClientConnectionHandle { tx }; - let id: u32 = rand::random(); Self { id, @@ -181,7 +181,7 @@ impl ClientConnection { } } Ok::<(), Error>(()) - }.instrument(error_span!(parent: None, "Client Actor", id=self.id)) + }.instrument(error_span!(parent: None, "Client Actor", id=?self.id)) ); Ok(handle_clone) diff --git a/daemon/src/actors/session_manager.rs b/daemon/src/actors/session_manager.rs index c5ac789..2f6f103 100644 --- a/daemon/src/actors/session_manager.rs +++ b/daemon/src/actors/session_manager.rs @@ -6,6 +6,7 @@ use handle_macro::Handle; use remux_core::states::DaemonState; use tokio::sync::mpsc; use tracing::Instrument; +use uuid::Uuid; use crate::{ actors::{ @@ -21,34 +22,34 @@ use crate::{ pub enum SessionManagerEvent { // client -> session manager events ClientConnect { - client_id: u32, + client_id: Uuid, client_handle: ClientConnectionHandle, session_id: u32, create_session: bool, }, ClientDisconnect { - client_id: u32, + client_id: Uuid, }, ClientSwitchSession { - client_id: u32, + client_id: Uuid, session_id: u32, }, // client -> session events UserInput { - client_id: u32, + client_id: Uuid, bytes: Bytes, }, UserSplitPane { - client_id: u32, + client_id: Uuid, direction: SplitDirection, }, UserIteratePane { - client_id: u32, + client_id: Uuid, is_next: bool, }, UserKillPane { - client_id: u32, + client_id: Uuid, }, // session -> client events @@ -67,9 +68,9 @@ pub struct SessionManager { handle: SessionManagerHandle, rx: mpsc::Receiver, sessions: HashMap, - clients: HashMap, - session_to_client_mapping: HashMap>, // support multiple clients attached to same session - client_to_session_mapping: HashMap, // one client can only attach to one session + clients: HashMap, + session_to_client_mapping: HashMap>, // support multiple clients attached to same session + client_to_session_mapping: HashMap, // one client can only attach to one session daemon_state: DaemonState, } impl SessionManager { @@ -100,7 +101,7 @@ impl SessionManager { loop { if let Some(event) = self.rx.recv().await { match &event { - SessionSendOutput { .. } => { + SessionSendOutput { .. } | UserInput { .. } => { trace!(event=?event); } _ => { @@ -168,7 +169,7 @@ impl SessionManager { async fn handle_client_connect( &mut self, - client_id: u32, + client_id: Uuid, client_handle: ClientConnectionHandle, session_id: u32, create_session: bool, @@ -203,7 +204,7 @@ impl SessionManager { session_handle.redraw().await?; Ok(()) } - async fn handle_client_disconnect(&mut self, client_id: u32) -> Result<()> { + async fn handle_client_disconnect(&mut self, client_id: Uuid) -> Result<()> { let client_handle = self.clients.remove(&client_id); if let Some(session_id) = self.client_to_session_mapping.remove(&client_id) && let Some(clients) = self.session_to_client_mapping.get_mut(&session_id) @@ -215,7 +216,7 @@ impl SessionManager { } Ok(()) } - async fn handle_client_switch_session(&mut self, client_id: u32, session_id: u32) -> Result<()> { + async fn handle_client_switch_session(&mut self, client_id: Uuid, session_id: u32) -> Result<()> { let client_handle = self.unmap_client(client_id).unwrap(); self.map_client(client_id, client_handle, session_id)?; if let Some(session_handle) = self.sessions.get(&session_id) { @@ -225,7 +226,7 @@ impl SessionManager { client_handle.success_attach_to_session(session_id).await?; Ok(()) } - async fn handle_client_send_user_input(&mut self, client_id: u32, bytes: Bytes) -> Result<()> { + async fn handle_client_send_user_input(&mut self, client_id: Uuid, bytes: Bytes) -> Result<()> { if let Some(session_id) = self.client_to_session_mapping.get(&client_id) { let session_handle = self.sessions.get_mut(session_id).unwrap(); session_handle.user_input(bytes).await @@ -233,7 +234,7 @@ impl SessionManager { Ok(()) } } - async fn handle_client_kill_pane(&mut self, client_id: u32) -> Result<()> { + async fn handle_client_kill_pane(&mut self, client_id: Uuid) -> Result<()> { if let Some(session_id) = self.client_to_session_mapping.get(&client_id) { let session_handle = self.sessions.get_mut(session_id).unwrap(); session_handle.user_kill_pane().await @@ -241,7 +242,7 @@ impl SessionManager { Ok(()) } } - async fn handle_client_split_pane(&mut self, client_id: u32, direction: SplitDirection) -> Result<()> { + async fn handle_client_split_pane(&mut self, client_id: Uuid, direction: SplitDirection) -> Result<()> { if let Some(session_id) = self.client_to_session_mapping.get(&client_id) { let session_handle = self.sessions.get_mut(session_id).unwrap(); session_handle.user_split_pane(direction).await @@ -250,7 +251,7 @@ impl SessionManager { Ok(()) } } - async fn handle_client_iterate_pane(&mut self, client_id: u32, is_next: bool) -> Result<()> { + async fn handle_client_iterate_pane(&mut self, client_id: Uuid, is_next: bool) -> Result<()> { if let Some(session_id) = self.client_to_session_mapping.get(&client_id) { let session_handle = self.sessions.get_mut(session_id).unwrap(); session_handle.user_iterate_pane(is_next).await @@ -267,7 +268,7 @@ impl SessionManager { Ok(()) } - fn map_client(&mut self, client_id: u32, client_handle: ClientConnectionHandle, session_id: u32) -> Result<()> { + fn map_client(&mut self, client_id: Uuid, client_handle: ClientConnectionHandle, session_id: u32) -> Result<()> { self.clients.insert(client_id, client_handle); let clients = self.session_to_client_mapping.entry(session_id).or_insert(vec![]); clients.push(client_id); @@ -275,7 +276,7 @@ impl SessionManager { Ok(()) } - fn unmap_client(&mut self, client_id: u32) -> Option { + fn unmap_client(&mut self, client_id: Uuid) -> Option { let client_handle = self.clients.remove(&client_id)?; if let Some(session_id) = self.client_to_session_mapping.remove(&client_id) && let Some(clients) = self.session_to_client_mapping.get_mut(&session_id) diff --git a/daemon/src/daemon.rs b/daemon/src/daemon.rs index 491ac56..5a4bf42 100644 --- a/daemon/src/daemon.rs +++ b/daemon/src/daemon.rs @@ -57,13 +57,13 @@ async fn handle_message(session_manager_handle: SessionManagerHandle, mut stream let req: DaemonRequestMessage = comm::read_message(&mut stream).await?; info!(request=?req, "Handling request"); match req.body { - DaemonRequestMessageBody::Attach(request::Attach { session_id, create }) => { + DaemonRequestMessageBody::Attach(request::Attach { id, session_id, create }) => { info!( connecting_session = session_id, create = create, "Creating new client actor" ); - let _client = ClientConnection::spawn(stream, session_manager_handle, session_id)?; + let _client = ClientConnection::spawn(id, stream, session_manager_handle, session_id)?; } }; Ok(())