From d9efc5cced7135cdab8a2d8b1f6fc407234b52da Mon Sep 17 00:00:00 2001 From: Prometheus1400 Date: Mon, 8 Dec 2025 17:06:20 -0800 Subject: [PATCH 1/4] better trace logs --- cli/src/app.rs | 6 +++--- daemon/src/actors/client_connection.rs | 4 ++-- daemon/src/actors/pane.rs | 2 +- daemon/src/actors/window.rs | 16 ++++++---------- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/cli/src/app.rs b/cli/src/app.rs index 3be0ec9..573b121 100644 --- a/cli/src/app.rs +++ b/cli/src/app.rs @@ -175,8 +175,8 @@ impl App { let span = error_span!("Recieved Daemon Event"); let _guard = span.enter(); match &event { - DaemonEvent::Raw(..) => { - trace!(event=?event); + DaemonEvent::Raw(bytes) => { + trace!(event=?event, num_bytes=bytes.len()); } _ => { info!(event=?event); @@ -202,7 +202,7 @@ impl App { } Err(e) => { error!(error=%e, "Error receiving daemon event"); - break; + // break; } } } diff --git a/daemon/src/actors/client_connection.rs b/daemon/src/actors/client_connection.rs index 39e2eb6..e02d92e 100644 --- a/daemon/src/actors/client_connection.rs +++ b/daemon/src/actors/client_connection.rs @@ -82,8 +82,8 @@ impl ClientConnection { let span = error_span!("Recieved Client Connection Event"); let _guard = span.enter(); match &event { - SessionOutput(..) => { - trace!(event=?event); + SessionOutput(bytes) => { + trace!(event=?event, num_bytes=bytes.len()); } _ => { info!(event=?event); diff --git a/daemon/src/actors/pane.rs b/daemon/src/actors/pane.rs index 0d908ce..1a8c298 100644 --- a/daemon/src/actors/pane.rs +++ b/daemon/src/actors/pane.rs @@ -129,7 +129,7 @@ impl Pane { async fn handle_pty_output(&mut self, bytes: Bytes) -> Result<()> { self.vte.process(&bytes); - self.handle.rerender().await + self.handle_rerender().await } // TODO: below code is bad and unused, need better diffing solution diff --git a/daemon/src/actors/window.rs b/daemon/src/actors/window.rs index 70e28a5..ae318d3 100644 --- a/daemon/src/actors/window.rs +++ b/daemon/src/actors/window.rs @@ -76,11 +76,7 @@ impl Window { let init_pane_id = 0; let init_layout_node = LayoutNode::Pane { id: init_pane_id }; - let (cols, rows) = match terminal::size() { - Ok((c, r)) => (c, r), - Err(_) => (80, 24), - }; - + let (cols, rows) = (80, 24); let root_rect = Rect { x: 0, y: 0, @@ -257,11 +253,11 @@ impl Window { } } - self.session_handle - .window_output(Bytes::from( - crossterm::terminal::Clear(crossterm::terminal::ClearType::All).to_string(), - )) - .await?; + // self.session_handle + // .window_output(Bytes::from( + // crossterm::terminal::Clear(crossterm::terminal::ClearType::All).to_string(), + // )) + // .await?; self.handle_redraw().await?; Ok(()) } From 499581bef7eac40849ad17774554386a823daca1 Mon Sep 17 00:00:00 2001 From: Prometheus1400 Date: Sat, 13 Dec 2025 07:52:55 -0800 Subject: [PATCH 2/4] wip --- cli/src/app.rs | 12 +- cli/src/args.rs | 2 +- cli/src/main.rs | 4 +- cli/src/states/status_line_state.rs | 11 +- core/src/events.rs | 2 +- core/src/messages/request.rs | 2 +- core/src/states.rs | 24 +- daemon/src/actors/client_connection.rs | 15 +- daemon/src/actors/session.rs | 17 +- daemon/src/actors/session_manager.rs | 357 ++++++++++++++++--------- daemon/src/daemon.rs | 6 +- 11 files changed, 298 insertions(+), 154 deletions(-) diff --git a/cli/src/app.rs b/cli/src/app.rs index 573b121..a9d300b 100644 --- a/cli/src/app.rs +++ b/cli/src/app.rs @@ -192,8 +192,8 @@ impl App { DaemonEvent::ActiveSession(session_id) => { self.state.daemon.set_active_session(session_id); } - DaemonEvent::NewSession(session_id) => { - self.state.daemon.add_session(session_id); + DaemonEvent::NewSession(session_id, session_name) => { + self.state.daemon.add_session(session_id, session_name); } _ => { todo!(); @@ -241,8 +241,8 @@ impl App { match selection { ui::traits::Selection::Index(i) => match self.state.mode { AppMode::SelectingSession => { - let session = self.state.daemon.session_ids[i]; - comm::send_event(&mut self.stream, CliEvent::SwitchSession(session)).await?; + let session = &self.state.daemon.sessions[i]; + comm::send_event(&mut self.stream, CliEvent::SwitchSession(session.id)).await?; } AppMode::Normal => {} }, @@ -275,12 +275,12 @@ impl App { self.state.mode = AppMode::SelectingSession; self.state.ui.selector.query.clear(); self.state.ui.selector.list_state.select(Some(0)); - self.state.ui.selector.selector_type = SelectorType::Fuzzy; + self.state.ui.selector.selector_type = SelectorType::Basic; self.state .ui .selector .list - .extend(self.state.daemon.session_ids.iter().map(|x| x.to_string())); + .extend(self.state.daemon.sessions.iter().map(|x| x.name.clone())); self.state.ui.selector.displaying_list = self .state .ui diff --git a/cli/src/args.rs b/cli/src/args.rs index fcdeb03..7453ca3 100644 --- a/cli/src/args.rs +++ b/cli/src/args.rs @@ -10,7 +10,7 @@ pub struct Args { pub enum Commands { Attach { #[arg(short = 's', long = "session")] - session_id: u32, + session_name: String, }, Session { #[command(subcommand)] diff --git a/cli/src/main.rs b/cli/src/main.rs index bb5b958..14c9262 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -91,13 +91,13 @@ async fn run(command: Commands) -> Result<()> { let stream = connect().await?; debug!("Running command"); match command { - Commands::Attach { session_id } => { + Commands::Attach { session_name } => { attach( stream, RequestBuilder::default() .body(request::Attach { id: Uuid::new_v4(), - session_id, + session_name, create: true, }) .build(), diff --git a/cli/src/states/status_line_state.rs b/cli/src/states/status_line_state.rs index 8c31dd4..3b262d1 100644 --- a/cli/src/states/status_line_state.rs +++ b/cli/src/states/status_line_state.rs @@ -29,8 +29,15 @@ impl StatusLineState { pub fn apply_built_ins(&mut self, state: &AppState) { for item in self.a.iter_mut().chain(self.b.iter_mut()).chain(self.c.iter_mut()) { if item.as_str() == "active-session" { - if let Some(s) = state.daemon.active_session.map(|s| s.to_string()) { - *item = s; + if let Some(name) = state.daemon.active_session.and_then(|id| { + state + .daemon + .sessions + .iter() + .find(|session_info| session_info.id == id) + .map(|session_info| &session_info.name) + }) { + *item = name.clone(); } else { *item = "".to_owned(); } diff --git a/core/src/events.rs b/core/src/events.rs index e21791d..89ee692 100644 --- a/core/src/events.rs +++ b/core/src/events.rs @@ -26,7 +26,7 @@ pub enum DaemonEvent { // session events CurrentSessions(Vec), ActiveSession(u32), - NewSession(u32), + NewSession(u32, String), DeletedSession(u32), // TODO: for window id Disconnected, diff --git a/core/src/messages/request.rs b/core/src/messages/request.rs index d0a7696..1839231 100644 --- a/core/src/messages/request.rs +++ b/core/src/messages/request.rs @@ -37,7 +37,7 @@ impl Message for DaemonRequestMessage {} #[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] pub struct Attach { pub id: Uuid, - pub session_id: u32, + pub session_name: String, pub create: bool, } impl RequestBody for Attach { diff --git a/core/src/states.rs b/core/src/states.rs index 0a4699d..eddfa31 100644 --- a/core/src/states.rs +++ b/core/src/states.rs @@ -1,21 +1,33 @@ /// comprehensive summary of the state of the daemon use serde::{Deserialize, Serialize}; +#[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct SessionInfo { + pub id: u32, + pub name: String, +} + #[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct DaemonState { - pub session_ids: Vec, + pub sessions: Vec, pub active_session: Option, // window_ids: Vec, // pub active_window: Option, } impl DaemonState { - pub fn set_sessions(&mut self, session_ids: Vec) { - self.session_ids = session_ids; + pub fn set_sessions(&mut self, sessions: Vec<(u32, String)>) { + self.sessions = sessions + .into_iter() + .map(|(id, name)| SessionInfo { id, name }) + .collect(); } - pub fn add_session(&mut self, session_id: u32) { - let i = self.session_ids.binary_search(&session_id).unwrap_or_else(|i| i); - self.session_ids.insert(i, session_id); + pub fn add_session(&mut self, id: u32, name: String) { + let i = self + .sessions + .binary_search_by_key(&id, |info| info.id) + .unwrap_or_else(|i| i); + self.sessions.insert(i, SessionInfo { id, name }); } // pub fn remove_session(&mut self, session_id: u32) { // self.session_ids.retain(|s| s != &session_id); diff --git a/daemon/src/actors/client_connection.rs b/daemon/src/actors/client_connection.rs index e02d92e..0d0785a 100644 --- a/daemon/src/actors/client_connection.rs +++ b/daemon/src/actors/client_connection.rs @@ -22,7 +22,7 @@ pub enum ClientConnectionEvent { Disconnect, // client side state update events - NewSession(u32), + NewSession(u32, String), // variants related to initialization phase InitialAttach(u32), // invoked directly by the daemon @@ -51,10 +51,10 @@ impl ClientConnection { id: Uuid, stream: UnixStream, session_manager_handle: SessionManagerHandle, - connecting_session_id: u32, + initial_session_name: &str, ) -> Result { let client = Self::new(id, stream, session_manager_handle); - client.run(connecting_session_id) + client.run(initial_session_name) } fn new(id: Uuid, stream: UnixStream, session_manager_handle: SessionManagerHandle) -> Self { let (tx, rx) = mpsc::channel(10); @@ -69,12 +69,13 @@ impl ClientConnection { state: ClientConnectionState::Unattached, } } - fn run(mut self, initial_session_id: u32) -> Result { + fn run(mut self, initial_session_name: &str) -> Result { let handle_clone = self.handle.clone(); + let session_name = initial_session_name.to_owned(); let _task = tokio::spawn( async move { let handle = self.handle.clone(); - self.session_manager_handle.client_connect(self.id, handle.clone(), initial_session_id, true).await?; + self.session_manager_handle.client_connect(self.id, handle.clone(), Some(session_name), true).await?; loop { use remux_core::events::CliEvent; tokio::select! { @@ -119,8 +120,8 @@ impl ClientConnection { SessionOutput(bytes) => { comm::send_event(&mut self.stream, DaemonEvent::Raw(bytes)).await.unwrap(); } - NewSession(session_id) => { - comm::send_event(&mut self.stream, DaemonEvent::NewSession(session_id)).await.unwrap(); + NewSession(session_id, session_name) => { + comm::send_event(&mut self.stream, DaemonEvent::NewSession(session_id, session_name)).await.unwrap(); } _ => { error!(event=?event, state=?self.state, "Unhandled or invalid event for current state"); diff --git a/daemon/src/actors/session.rs b/daemon/src/actors/session.rs index 9fa1a2f..6c0cf5c 100644 --- a/daemon/src/actors/session.rs +++ b/daemon/src/actors/session.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use handle_macro::Handle; use tokio::sync::mpsc; -use tracing::Instrument; +use tracing::{Instrument, Span}; use crate::{ actors::{ @@ -26,6 +26,8 @@ pub enum SessionEvent { UserKillPane, Redraw, + RenameSession(String), + // output WindowOutput(Bytes), TerminalResize { rows: u16, cols: u16 }, @@ -35,6 +37,7 @@ use SessionEvent::*; pub struct Session { id: u32, + name: String, handle: SessionHandle, session_manager_handle: SessionManagerHandle, rx: mpsc::Receiver, @@ -42,16 +45,17 @@ pub struct Session { } impl Session { #[instrument(parent=None, skip(session_manager_handle), name="Session")] - pub fn spawn(id: u32, session_manager_handle: SessionManagerHandle) -> Result { - let session = Session::new(id, session_manager_handle); + pub fn spawn(id: u32, name: String, session_manager_handle: SessionManagerHandle) -> Result { + let session = Session::new(id, name, session_manager_handle); session.run() } - fn new(id: u32, session_manager_handle: SessionManagerHandle) -> Self { + fn new(id: u32, name: String, session_manager_handle: SessionManagerHandle) -> Self { let (tx, rx) = mpsc::channel(10); let handle = SessionHandle { tx }; let window_handle = Window::spawn(handle.clone()).unwrap(); Self { id, + name, session_manager_handle, handle, rx, @@ -101,6 +105,11 @@ impl Session { TerminalResize { rows, cols } => { self.window_handle.terminal_resize(rows, cols).await.unwrap(); } + RenameSession(name) => { + let span = Span::current(); + self.name = name.clone(); + span.record("name", name); + } } } } diff --git a/daemon/src/actors/session_manager.rs b/daemon/src/actors/session_manager.rs index 2f6f103..15ea25f 100644 --- a/daemon/src/actors/session_manager.rs +++ b/daemon/src/actors/session_manager.rs @@ -1,9 +1,10 @@ -use std::{collections::HashMap, vec}; +use std::collections::HashMap; use bytes::Bytes; -use color_eyre::eyre; +use color_eyre::eyre::{self, OptionExt, eyre}; use handle_macro::Handle; -use remux_core::states::DaemonState; +use itertools::Itertools; +use remux_core::states::{self, DaemonState}; use tokio::sync::mpsc; use tracing::Instrument; use uuid::Uuid; @@ -24,7 +25,7 @@ pub enum SessionManagerEvent { ClientConnect { client_id: Uuid, client_handle: ClientConnectionHandle, - session_id: u32, + session_name: Option, create_session: bool, }, ClientDisconnect { @@ -64,13 +65,143 @@ pub enum SessionManagerEvent { } use SessionManagerEvent::*; +#[derive(Debug)] +struct SessionInfo { + pub handle: SessionHandle, + pub name: String, + pub id: u32, +} + +#[derive(Debug)] +struct SessionManagerState { + session_name_to_id: HashMap, + sessions: HashMap, + session_to_client_mapping: HashMap>, // support multiple clients attached to same session + clients: HashMap, + client_to_session_mapping: HashMap, // one client can only attach to one session + session_id_count: u32, + manager_handle: SessionManagerHandle, +} + +impl SessionManagerState { + pub fn new(manager_handle: &SessionManagerHandle) -> Self { + Self { + session_name_to_id: Default::default(), + sessions: Default::default(), + session_to_client_mapping: Default::default(), + clients: Default::default(), + client_to_session_mapping: Default::default(), + session_id_count: Default::default(), + manager_handle: manager_handle.clone(), + } + } + pub fn snapshot(&self) -> DaemonState { + let mut daemon_state = DaemonState::default(); + daemon_state.sessions = self + .sessions + .values() + .map(|s| states::SessionInfo { + id: s.id, + name: s.name.clone(), + }) + .collect_vec(); + daemon_state + } + pub fn get_by_id(&self, id: u32) -> Option<&SessionInfo> { + self.sessions.get(&id) + } + pub fn get_by_name(&self, name: &str) -> Option<&SessionInfo> { + self.session_name_to_id.get(name).and_then(|id| self.sessions.get(&id)) + } + fn new_session_id(&mut self) -> u32 { + let x = self.session_id_count; + self.session_id_count += 1; + x + } + + pub fn get_session_for_client(&self, client_id: &Uuid) -> Result<&SessionInfo> { + let session_id = self + .client_to_session_mapping + .get(client_id) + .ok_or_eyre("client has no session")?; + self.sessions.get(session_id).ok_or_eyre("no session") + } + + pub fn get_clients_for_session(&self, session_id: &u32) -> Result> { + let client_ids = self + .session_to_client_mapping + .get(session_id) + .ok_or_eyre("error getting clients for session")?; + Ok(self + .clients + .iter() + .filter(|c| client_ids.contains(c.0)) + .map(|c| c.1) + .collect_vec()) + } + + pub fn create_new(&mut self, name: Option<&str>) -> Result<&SessionInfo> { + if name.and_then(|n| self.get_by_name(n)).is_some() { + Err(eyre!("duplicate session")) + } else { + let id = self.new_session_id(); + let name = name.map(|n| n.to_owned()).unwrap_or(id.to_string()); + let handle = Session::spawn(id, name.clone(), self.manager_handle.clone())?; + self.session_name_to_id.insert(name.clone(), id); + self.sessions.insert(id, SessionInfo { handle, name, id }); + self.sessions + .get(&id) + .ok_or(eyre!("couldn't get session info from sessions")) + } + } + + pub fn attach_client( + &mut self, + client_id: Uuid, + client_handle: &ClientConnectionHandle, + session_name: &str, + create: bool, + ) -> Result<()> { + let mut id_opt = self.get_by_name(session_name).map(|info| info.id); + if id_opt.is_none() && create { + id_opt = Some(self.create_new(Some(session_name))?.id); + } + + if let Some(id) = id_opt { + self.session_to_client_mapping + .entry(id) + .or_insert(Default::default()) + .push(client_id.clone()); + self.client_to_session_mapping.insert(client_id.clone(), id); + self.clients.insert(client_id, client_handle.clone()); + Ok(()) + } else { + Err(eyre!("no session to attach client")) + } + } + pub fn client_switch_session(&mut self, client_id: Uuid, session_name: &str) -> Result<()> { + let id_opt = self.get_by_name(session_name).map(|info| info.id); + if let Some(id) = id_opt { + if let Some(clients) = self.session_to_client_mapping.get_mut(&id) { + clients.retain(|id| id != &client_id); + } + self.session_to_client_mapping + .get_mut(&id) + .ok_or(eyre!("session should exist"))? + .push(client_id.clone()); + self.client_to_session_mapping.insert(client_id, id); + Ok(()) + } else { + Err(eyre!("no such session to switch to")) + } + } +} + +#[derive(Debug)] pub struct SessionManager { handle: SessionManagerHandle, rx: mpsc::Receiver, - sessions: HashMap, - clients: HashMap, - session_to_client_mapping: HashMap>, // support multiple clients attached to same session - client_to_session_mapping: HashMap, // one client can only attach to one session + state: SessionManagerState, daemon_state: DaemonState, } impl SessionManager { @@ -83,12 +214,9 @@ impl SessionManager { let (tx, rx) = mpsc::channel(10); let handle = SessionManagerHandle { tx }; Self { - handle, + handle: handle.clone(), rx, - sessions: HashMap::new(), - clients: HashMap::new(), - session_to_client_mapping: HashMap::new(), - client_to_session_mapping: HashMap::new(), + state: SessionManagerState::new(&handle), daemon_state: DaemonState::default(), } } @@ -112,18 +240,24 @@ impl SessionManager { ClientConnect { client_id, client_handle, - session_id, + session_name, create_session, } => { - self.handle_client_connect(client_id, client_handle, session_id, create_session) - .await - .unwrap(); + self.handle_client_connect( + client_id, + client_handle, + session_name.as_deref(), + create_session, + ) + .await + .unwrap(); } ClientDisconnect { client_id } => { self.handle_client_disconnect(client_id).await.unwrap(); } ClientSwitchSession { client_id, session_id } => { - self.handle_client_switch_session(client_id, session_id).await.unwrap(); + todo!(); + // self.handle_client_switch_session(client_id, session_id).await.unwrap(); } UserInput { client_id, bytes } => { self.handle_client_send_user_input(client_id, bytes).await.unwrap(); @@ -141,8 +275,8 @@ impl SessionManager { self.handle_session_send_output(session_id, bytes).await.unwrap(); } TerminalResize { rows, cols } => { - for session in self.sessions.values_mut() { - session.terminal_resize(rows, cols).await.unwrap(); + for SessionInfo { handle, .. } in self.state.sessions.values_mut() { + handle.terminal_resize(rows, cols).await.unwrap(); } } } @@ -156,133 +290,114 @@ impl SessionManager { } /// creates a new session and handles updating the state and notifying clients about the update - async fn create_session(&mut self, session_id: u32) -> Result<&SessionHandle> { - let new_session = Session::spawn(session_id, self.handle.clone())?; - self.sessions.insert(session_id, new_session); - let session_handle_ref = self.sessions.get(&session_id).unwrap(); - self.daemon_state.add_session(session_id); - for c in self.clients.values() { - c.new_session(session_id).await?; - } - Ok(session_handle_ref) + async fn create_session(&mut self, session_name: Option<&str>) -> Result<&SessionInfo> { + self.state.create_new(session_name) } async fn handle_client_connect( &mut self, client_id: Uuid, client_handle: ClientConnectionHandle, - session_id: u32, + session_name: Option<&str>, create_session: bool, ) -> Result<()> { - // session doesn't exist and client not trying to create it - if !self.sessions.contains_key(&session_id) && !create_session { - client_handle - .initial_attach_result(Err(eyre::eyre!("no such session"))) - .await?; - return Ok(()); - } - - // session didn't exist - if !self.sessions.contains_key(&session_id) { - self.create_session(session_id).await?; - } - - // session exists - let clients = self.session_to_client_mapping.entry(session_id).or_insert(vec![]); - for c in self.clients.values() { - c.new_session(session_id).await?; - } - self.clients.insert(client_id, client_handle.clone()); - clients.push(client_id); - self.client_to_session_mapping.insert(client_id, session_id); - - let session_handle = self.sessions.get_mut(&session_id).expect("session should exist here"); - client_handle - .initial_attach_result(Ok(self.daemon_state.clone())) - .await?; - client_handle.success_attach_to_session(session_id).await?; - session_handle.redraw().await?; - Ok(()) - } - async fn handle_client_disconnect(&mut self, client_id: Uuid) -> Result<()> { - let client_handle = self.clients.remove(&client_id); - if let Some(session_id) = self.client_to_session_mapping.remove(&client_id) - && let Some(clients) = self.session_to_client_mapping.get_mut(&session_id) + let session_name = session_name.ok_or(eyre!("no session name"))?; + match self + .state + .attach_client(client_id, &client_handle, session_name, create_session) { - clients.retain(|c| c != &client_id); - } - if let Some(client_handle) = client_handle { - client_handle.disconnect().await.unwrap(); + Ok(_) => { + let session_info = self.state.get_by_name(session_name).expect("session should exist here"); + client_handle.initial_attach_result(Ok(self.state.snapshot())).await?; + client_handle.success_attach_to_session(session_info.id).await?; + session_info.handle.redraw().await?; + } + Err(e) => { + client_handle.initial_attach_result(Err(eyre::eyre!(e))).await?; + } } Ok(()) } - async fn handle_client_switch_session(&mut self, client_id: Uuid, session_id: u32) -> Result<()> { - let client_handle = self.unmap_client(client_id).unwrap(); - self.map_client(client_id, client_handle, session_id)?; - if let Some(session_handle) = self.sessions.get(&session_id) { - session_handle.redraw().await?; - } - let client_handle = self.clients.get(&client_id).unwrap(); - client_handle.success_attach_to_session(session_id).await?; + + async fn handle_client_disconnect(&mut self, client_id: Uuid) -> Result<()> { + // let client_handle = self.clients.remove(&client_id); + // if let Some(session_id) = self.client_to_session_mapping.remove(&client_id) + // && let Some(clients) = self.session_to_client_mapping.get_mut(&session_id) + // { + // clients.retain(|c| c != &client_id); + // } + // if let Some(client_handle) = client_handle { + // client_handle.disconnect().await.unwrap(); + // } + todo!(); Ok(()) } + + // async fn handle_client_switch_session(&mut self, client_id: Uuid, session_id: u32) -> Result<()> { + // let client_handle = self.unmap_client(client_id).unwrap(); + // self.map_client(client_id, client_handle, session_id)?; + // if let Some(session_handle) = self.sessions.get(&session_id) { + // session_handle.redraw().await?; + // } + // let client_handle = self.clients.get(&client_id).unwrap(); + // client_handle.success_attach_to_session(session_id).await?; + // Ok(()) + // } + async fn handle_client_send_user_input(&mut self, client_id: Uuid, bytes: Bytes) -> Result<()> { - if let Some(session_id) = self.client_to_session_mapping.get(&client_id) { - let session_handle = self.sessions.get_mut(session_id).unwrap(); - session_handle.user_input(bytes).await - } else { - Ok(()) - } + self.state + .get_session_for_client(&client_id)? + .handle + .user_input(bytes) + .await } + async fn handle_client_kill_pane(&mut self, client_id: Uuid) -> Result<()> { - if let Some(session_id) = self.client_to_session_mapping.get(&client_id) { - let session_handle = self.sessions.get_mut(session_id).unwrap(); - session_handle.user_kill_pane().await - } else { - Ok(()) - } + self.state + .get_session_for_client(&client_id)? + .handle + .user_kill_pane() + .await } + async fn handle_client_split_pane(&mut self, client_id: Uuid, direction: SplitDirection) -> Result<()> { - if let Some(session_id) = self.client_to_session_mapping.get(&client_id) { - let session_handle = self.sessions.get_mut(session_id).unwrap(); - session_handle.user_split_pane(direction).await - } else { - // TODO: should error - Ok(()) - } + self.state + .get_session_for_client(&client_id)? + .handle + .user_split_pane(direction) + .await } + async fn handle_client_iterate_pane(&mut self, client_id: Uuid, is_next: bool) -> Result<()> { - if let Some(session_id) = self.client_to_session_mapping.get(&client_id) { - let session_handle = self.sessions.get_mut(session_id).unwrap(); - session_handle.user_iterate_pane(is_next).await - } else { - // TODO: should error - Ok(()) - } + self.state + .get_session_for_client(&client_id)? + .handle + .user_iterate_pane(is_next) + .await } + async fn handle_session_send_output(&mut self, session_id: u32, bytes: Bytes) -> Result<()> { - for client_id in self.session_to_client_mapping.get(&session_id).unwrap() { - let client_handle = self.clients.get_mut(client_id).unwrap(); - client_handle.session_output(bytes.clone()).await?; + for client in self.state.get_clients_for_session(&session_id)? { + client.session_output(bytes.clone()).await?; } Ok(()) } - fn map_client(&mut self, client_id: Uuid, client_handle: ClientConnectionHandle, session_id: u32) -> Result<()> { - self.clients.insert(client_id, client_handle); - let clients = self.session_to_client_mapping.entry(session_id).or_insert(vec![]); - clients.push(client_id); - self.client_to_session_mapping.insert(client_id, session_id); - Ok(()) - } + // fn map_client(&mut self, client_id: Uuid, client_handle: ClientConnectionHandle, session_id: u32) -> Result<()> { + // self.clients.insert(client_id, client_handle); + // let clients = self.session_to_client_mapping.entry(session_id).or_insert(vec![]); + // clients.push(client_id); + // self.client_to_session_mapping.insert(client_id, session_id); + // Ok(()) + // } - fn unmap_client(&mut self, client_id: Uuid) -> Option { - let client_handle = self.clients.remove(&client_id)?; - if let Some(session_id) = self.client_to_session_mapping.remove(&client_id) - && let Some(clients) = self.session_to_client_mapping.get_mut(&session_id) - { - clients.retain(|c| c != &client_id); - } - Some(client_handle) - } + // fn unmap_client(&mut self, client_id: Uuid) -> Option { + // let client_handle = self.clients.remove(&client_id)?; + // if let Some(session_id) = self.client_to_session_mapping.remove(&client_id) + // && let Some(clients) = self.session_to_client_mapping.get_mut(&session_id) + // { + // clients.retain(|c| c != &client_id); + // } + // Some(client_handle) + // } } diff --git a/daemon/src/daemon.rs b/daemon/src/daemon.rs index 5a4bf42..78c2df3 100644 --- a/daemon/src/daemon.rs +++ b/daemon/src/daemon.rs @@ -57,13 +57,13 @@ async fn handle_message(session_manager_handle: SessionManagerHandle, mut stream let req: DaemonRequestMessage = comm::read_message(&mut stream).await?; info!(request=?req, "Handling request"); match req.body { - DaemonRequestMessageBody::Attach(request::Attach { id, session_id, create }) => { + DaemonRequestMessageBody::Attach(request::Attach { id, session_name, create }) => { info!( - connecting_session = session_id, + connecting_session = session_name, create = create, "Creating new client actor" ); - let _client = ClientConnection::spawn(id, stream, session_manager_handle, session_id)?; + let _client = ClientConnection::spawn(id, stream, session_manager_handle, &session_name)?; } }; Ok(()) From 712dbe80328876e23c9bed2df401bff7d5f63383 Mon Sep 17 00:00:00 2001 From: Prometheus1400 Date: Sat, 13 Dec 2025 11:29:43 -0800 Subject: [PATCH 3/4] session has name --- cli/src/app.rs | 2 +- core/src/events.rs | 2 +- daemon/src/actors/client_connection.rs | 4 +- daemon/src/actors/session_manager.rs | 168 +++++++++++-------------- daemon/src/daemon.rs | 6 +- 5 files changed, 82 insertions(+), 100 deletions(-) diff --git a/cli/src/app.rs b/cli/src/app.rs index a9d300b..932817a 100644 --- a/cli/src/app.rs +++ b/cli/src/app.rs @@ -242,7 +242,7 @@ impl App { ui::traits::Selection::Index(i) => match self.state.mode { AppMode::SelectingSession => { let session = &self.state.daemon.sessions[i]; - comm::send_event(&mut self.stream, CliEvent::SwitchSession(session.id)).await?; + comm::send_event(&mut self.stream, CliEvent::SwitchSession(session.name.clone())).await?; } AppMode::Normal => {} }, diff --git a/core/src/events.rs b/core/src/events.rs index 89ee692..d55bfad 100644 --- a/core/src/events.rs +++ b/core/src/events.rs @@ -12,7 +12,7 @@ pub enum CliEvent { SplitPaneHorizontal, PrevPane, - SwitchSession(u32), // switch session - does nothing if session does not exist + SwitchSession(String), // switch session - does nothing if session does not exist TerminalResize { rows: u16, cols: u16 }, diff --git a/daemon/src/actors/client_connection.rs b/daemon/src/actors/client_connection.rs index 0d0785a..d4d8a7a 100644 --- a/daemon/src/actors/client_connection.rs +++ b/daemon/src/actors/client_connection.rs @@ -166,8 +166,8 @@ impl ClientConnection { CliEvent::PrevPane => { self.session_manager_handle.user_iterate_pane(self.id, false).await.unwrap(); }, - CliEvent::SwitchSession(session_id) => { - self.session_manager_handle.client_switch_session(self.id, session_id).await.unwrap(); + CliEvent::SwitchSession(session_name) => { + self.session_manager_handle.client_switch_session(self.id, session_name).await.unwrap(); } } } diff --git a/daemon/src/actors/session_manager.rs b/daemon/src/actors/session_manager.rs index 15ea25f..6704001 100644 --- a/daemon/src/actors/session_manager.rs +++ b/daemon/src/actors/session_manager.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use color_eyre::eyre::{self, OptionExt, eyre}; use handle_macro::Handle; use itertools::Itertools; -use remux_core::states::{self, DaemonState}; +use remux_core::states::DaemonState; use tokio::sync::mpsc; use tracing::Instrument; use uuid::Uuid; @@ -33,7 +33,7 @@ pub enum SessionManagerEvent { }, ClientSwitchSession { client_id: Uuid, - session_id: u32, + session_name: String, }, // client -> session events @@ -95,30 +95,22 @@ impl SessionManagerState { manager_handle: manager_handle.clone(), } } - pub fn snapshot(&self) -> DaemonState { - let mut daemon_state = DaemonState::default(); - daemon_state.sessions = self - .sessions - .values() - .map(|s| states::SessionInfo { - id: s.id, - name: s.name.clone(), - }) - .collect_vec(); - daemon_state - } - pub fn get_by_id(&self, id: u32) -> Option<&SessionInfo> { - self.sessions.get(&id) - } - pub fn get_by_name(&self, name: &str) -> Option<&SessionInfo> { - self.session_name_to_id.get(name).and_then(|id| self.sessions.get(&id)) - } fn new_session_id(&mut self) -> u32 { let x = self.session_id_count; self.session_id_count += 1; x } - + pub fn snapshot(&self) -> DaemonState { + let mut daemon_state = DaemonState::default(); + daemon_state.set_sessions(self.sessions.values().map(|s| (s.id, s.name.clone())).collect_vec()); + daemon_state + } + // pub fn get_by_id(&self, id: u32) -> Option<&SessionInfo> { + // self.sessions.get(&id) + // } + pub fn get_session_by_name(&self, name: &str) -> Option<&SessionInfo> { + self.session_name_to_id.get(name).and_then(|id| self.sessions.get(id)) + } pub fn get_session_for_client(&self, client_id: &Uuid) -> Result<&SessionInfo> { let session_id = self .client_to_session_mapping @@ -126,7 +118,6 @@ impl SessionManagerState { .ok_or_eyre("client has no session")?; self.sessions.get(session_id).ok_or_eyre("no session") } - pub fn get_clients_for_session(&self, session_id: &u32) -> Result> { let client_ids = self .session_to_client_mapping @@ -140,8 +131,8 @@ impl SessionManagerState { .collect_vec()) } - pub fn create_new(&mut self, name: Option<&str>) -> Result<&SessionInfo> { - if name.and_then(|n| self.get_by_name(n)).is_some() { + pub fn create_new_session(&mut self, name: Option<&str>) -> Result<&SessionInfo> { + if name.and_then(|n| self.get_session_by_name(n)).is_some() { Err(eyre!("duplicate session")) } else { let id = self.new_session_id(); @@ -158,43 +149,51 @@ impl SessionManagerState { pub fn attach_client( &mut self, client_id: Uuid, - client_handle: &ClientConnectionHandle, + client_handle: ClientConnectionHandle, session_name: &str, create: bool, ) -> Result<()> { - let mut id_opt = self.get_by_name(session_name).map(|info| info.id); + let mut id_opt = self.get_session_by_name(session_name).map(|info| info.id); if id_opt.is_none() && create { - id_opt = Some(self.create_new(Some(session_name))?.id); + id_opt = Some(self.create_new_session(Some(session_name))?.id); } if let Some(id) = id_opt { - self.session_to_client_mapping - .entry(id) - .or_insert(Default::default()) - .push(client_id.clone()); - self.client_to_session_mapping.insert(client_id.clone(), id); - self.clients.insert(client_id, client_handle.clone()); + self.session_to_client_mapping.entry(id).or_default().push(client_id); + self.client_to_session_mapping.insert(client_id, id); + self.clients.insert(client_id, client_handle); Ok(()) } else { Err(eyre!("no session to attach client")) } } - pub fn client_switch_session(&mut self, client_id: Uuid, session_name: &str) -> Result<()> { - let id_opt = self.get_by_name(session_name).map(|info| info.id); - if let Some(id) = id_opt { - if let Some(clients) = self.session_to_client_mapping.get_mut(&id) { - clients.retain(|id| id != &client_id); - } + pub fn detach_client(&mut self, client_id: Uuid) -> Option { + if self.clients.contains_key(&client_id) { + let session_id = self.client_to_session_mapping.remove(&client_id)?; self.session_to_client_mapping - .get_mut(&id) - .ok_or(eyre!("session should exist"))? - .push(client_id.clone()); - self.client_to_session_mapping.insert(client_id, id); - Ok(()) + .get_mut(&session_id)? + .retain(|x| x != &client_id); + self.clients.remove(&client_id) } else { - Err(eyre!("no such session to switch to")) + None } } + // pub fn client_switch_session(&mut self, client_id: Uuid, session_name: &str) -> Result<()> { + // let id_opt = self.get_by_name(session_name).map(|info| info.id); + // if let Some(id) = id_opt { + // if let Some(clients) = self.session_to_client_mapping.get_mut(&id) { + // clients.retain(|id| id != &client_id); + // } + // self.session_to_client_mapping + // .get_mut(&id) + // .ok_or(eyre!("session should exist"))? + // .push(client_id); + // self.client_to_session_mapping.insert(client_id, id); + // Ok(()) + // } else { + // Err(eyre!("no such session to switch to")) + // } + // } } #[derive(Debug)] @@ -202,7 +201,6 @@ pub struct SessionManager { handle: SessionManagerHandle, rx: mpsc::Receiver, state: SessionManagerState, - daemon_state: DaemonState, } impl SessionManager { pub fn spawn() -> Result { @@ -217,7 +215,6 @@ impl SessionManager { handle: handle.clone(), rx, state: SessionManagerState::new(&handle), - daemon_state: DaemonState::default(), } } @@ -255,9 +252,13 @@ impl SessionManager { ClientDisconnect { client_id } => { self.handle_client_disconnect(client_id).await.unwrap(); } - ClientSwitchSession { client_id, session_id } => { - todo!(); - // self.handle_client_switch_session(client_id, session_id).await.unwrap(); + ClientSwitchSession { + client_id, + session_name, + } => { + self.handle_client_switch_session(client_id, &session_name) + .await + .unwrap(); } UserInput { client_id, bytes } => { self.handle_client_send_user_input(client_id, bytes).await.unwrap(); @@ -289,10 +290,10 @@ impl SessionManager { Ok(handle_clone) } - /// creates a new session and handles updating the state and notifying clients about the update - async fn create_session(&mut self, session_name: Option<&str>) -> Result<&SessionInfo> { - self.state.create_new(session_name) - } + // /// creates a new session and handles updating the state and notifying clients about the update + // async fn create_session(&mut self, session_name: Option<&str>) -> Result<&SessionInfo> { + // self.state.create_new_session(session_name) + // } async fn handle_client_connect( &mut self, @@ -304,10 +305,13 @@ impl SessionManager { let session_name = session_name.ok_or(eyre!("no session name"))?; match self .state - .attach_client(client_id, &client_handle, session_name, create_session) + .attach_client(client_id, client_handle.clone(), session_name, create_session) { Ok(_) => { - let session_info = self.state.get_by_name(session_name).expect("session should exist here"); + let session_info = self + .state + .get_session_by_name(session_name) + .expect("session should exist here"); client_handle.initial_attach_result(Ok(self.state.snapshot())).await?; client_handle.success_attach_to_session(session_info.id).await?; session_info.handle.redraw().await?; @@ -320,29 +324,21 @@ impl SessionManager { } async fn handle_client_disconnect(&mut self, client_id: Uuid) -> Result<()> { - // let client_handle = self.clients.remove(&client_id); - // if let Some(session_id) = self.client_to_session_mapping.remove(&client_id) - // && let Some(clients) = self.session_to_client_mapping.get_mut(&session_id) - // { - // clients.retain(|c| c != &client_id); - // } - // if let Some(client_handle) = client_handle { - // client_handle.disconnect().await.unwrap(); - // } - todo!(); - Ok(()) + if let Some(client) = self.state.detach_client(client_id) { + client.disconnect().await + } else { + Ok(()) + } } - // async fn handle_client_switch_session(&mut self, client_id: Uuid, session_id: u32) -> Result<()> { - // let client_handle = self.unmap_client(client_id).unwrap(); - // self.map_client(client_id, client_handle, session_id)?; - // if let Some(session_handle) = self.sessions.get(&session_id) { - // session_handle.redraw().await?; - // } - // let client_handle = self.clients.get(&client_id).unwrap(); - // client_handle.success_attach_to_session(session_id).await?; - // Ok(()) - // } + async fn handle_client_switch_session(&mut self, client_id: Uuid, session_name: &str) -> Result<()> { + let client = self.state.detach_client(client_id).ok_or_eyre("no such client")?; + self.state + .attach_client(client_id, client.clone(), session_name, false)?; + let session = self.state.get_session_for_client(&client_id)?; + session.handle.redraw().await?; + client.success_attach_to_session(session.id).await + } async fn handle_client_send_user_input(&mut self, client_id: Uuid, bytes: Bytes) -> Result<()> { self.state @@ -382,22 +378,4 @@ impl SessionManager { } Ok(()) } - - // fn map_client(&mut self, client_id: Uuid, client_handle: ClientConnectionHandle, session_id: u32) -> Result<()> { - // self.clients.insert(client_id, client_handle); - // let clients = self.session_to_client_mapping.entry(session_id).or_insert(vec![]); - // clients.push(client_id); - // self.client_to_session_mapping.insert(client_id, session_id); - // Ok(()) - // } - - // fn unmap_client(&mut self, client_id: Uuid) -> Option { - // let client_handle = self.clients.remove(&client_id)?; - // if let Some(session_id) = self.client_to_session_mapping.remove(&client_id) - // && let Some(clients) = self.session_to_client_mapping.get_mut(&session_id) - // { - // clients.retain(|c| c != &client_id); - // } - // Some(client_handle) - // } } diff --git a/daemon/src/daemon.rs b/daemon/src/daemon.rs index 78c2df3..1cdef48 100644 --- a/daemon/src/daemon.rs +++ b/daemon/src/daemon.rs @@ -57,7 +57,11 @@ async fn handle_message(session_manager_handle: SessionManagerHandle, mut stream let req: DaemonRequestMessage = comm::read_message(&mut stream).await?; info!(request=?req, "Handling request"); match req.body { - DaemonRequestMessageBody::Attach(request::Attach { id, session_name, create }) => { + DaemonRequestMessageBody::Attach(request::Attach { + id, + session_name, + create, + }) => { info!( connecting_session = session_name, create = create, From 0c2d823178d319ecfe354fb327cf34dcfc5c65fd Mon Sep 17 00:00:00 2001 From: Prometheus1400 Date: Sat, 13 Dec 2025 11:31:00 -0800 Subject: [PATCH 4/4] fix test --- core/src/comm.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/comm.rs b/core/src/comm.rs index 7772399..9d11290 100644 --- a/core/src/comm.rs +++ b/core/src/comm.rs @@ -106,7 +106,7 @@ mod test { let attach = request::Attach { id: Uuid::new_v4(), - session_id: 1, + session_name: "session".to_owned(), create: true, }; let cli_req = RequestBuilder::default().body(attach.clone()).build();