diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index f00ccf3..32d2fa6 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -280,6 +280,7 @@ async fn async_init() -> DynResult { token: remote_token, selected_rig_id: frontend_runtime.remote_active_rig_id.clone(), known_rigs: frontend_runtime.remote_rigs.clone(), + rig_states: frontend_runtime.rig_states.clone(), poll_interval: Duration::from_millis(poll_interval_ms), spectrum: frontend_runtime.spectrum.clone(), server_connected: frontend_runtime.server_connected.clone(), diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index 9d2d43b..4efbf80 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -2,7 +2,9 @@ // // SPDX-License-Identifier: BSD-2-Clause +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::RwLock; use std::time::Duration; use std::{sync::Arc, sync::Mutex}; @@ -58,6 +60,7 @@ pub struct RemoteClientConfig { pub spectrum: Arc>, /// Shared flag: `true` while a TCP connection to trx-server is active. pub server_connected: Arc, + pub rig_states: Arc>>>, } pub async fn run_remote_client( @@ -420,6 +423,31 @@ async fn refresh_remote_snapshot( true } }); + + // Update per-rig watch channels so each SSE session can subscribe + // to a specific rig's state independently. + if let Ok(mut rig_map) = config.rig_states.write() { + for entry in &rigs { + let new_state = RigState::from_snapshot(entry.state.clone()); + if let Some(tx) = rig_map.get(&entry.rig_id) { + tx.send_if_modified(|old| { + if *old == new_state { + false + } else { + *old = new_state; + true + } + }); + } else { + let (tx, _rx) = watch::channel(new_state); + rig_map.insert(entry.rig_id.clone(), tx); + } + } + // Remove channels for rigs no longer reported by the server. + let active_ids: std::collections::HashSet<&str> = + rigs.iter().map(|e| e.rig_id.as_str()).collect(); + rig_map.retain(|id, _| active_ids.contains(id.as_str())); + } Ok(()) } @@ -653,8 +681,9 @@ fn parse_port(port_str: &str) -> Result { #[cfg(test)] mod tests { use super::{parse_remote_url, RemoteClientConfig, RemoteEndpoint, SharedSpectrum}; + use std::collections::HashMap; use std::sync::atomic::AtomicBool; - use std::sync::{Arc, Mutex}; + use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; @@ -839,6 +868,7 @@ mod tests { poll_interval: Duration::from_millis(100), spectrum: Arc::new(spectrum_tx), server_connected: Arc::new(AtomicBool::new(false)), + rig_states: Arc::new(RwLock::new(HashMap::new())), }, req_rx, state_tx, @@ -877,6 +907,7 @@ mod tests { poll_interval: Duration::from_millis(500), spectrum: Arc::new(spectrum_tx), server_connected: Arc::new(AtomicBool::new(false)), + rig_states: Arc::new(RwLock::new(HashMap::new())), }; let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None); assert_eq!(envelope.token.as_deref(), Some("secret")); diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index 343d1fa..dcf98ec 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -252,6 +252,10 @@ pub struct FrontendRuntimeContext { pub remote_active_rig_id: Arc>>, /// Cached remote rig list from GetRigs polling. pub remote_rigs: Arc>>, + /// Per-rig state watch channels, keyed by rig_id. + /// Populated by the remote client poll loop so each SSE session can + /// subscribe to a specific rig's state independently. + pub rig_states: Arc>>>, /// Owner callsign from trx-client config/CLI for frontend display. pub owner_callsign: Option, /// Optional website URL for the web UI header title link. @@ -281,6 +285,14 @@ pub struct FrontendRuntimeContext { } impl FrontendRuntimeContext { + /// Get a watch receiver for a specific rig's state. + pub fn rig_state_rx(&self, rig_id: &str) -> Option> { + self.rig_states + .read() + .ok() + .and_then(|map| map.get(rig_id).map(|tx| tx.subscribe())) + } + /// Create a new empty runtime context. pub fn new() -> Self { Self { @@ -317,6 +329,7 @@ impl FrontendRuntimeContext { http_decode_history_retention_min_by_rig: HashMap::new(), remote_active_rig_id: Arc::new(Mutex::new(None)), remote_rigs: Arc::new(Mutex::new(Vec::new())), + rig_states: Arc::new(RwLock::new(HashMap::new())), owner_callsign: None, owner_website_url: None, owner_website_name: None, diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 0a49da8..9a33c7f 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -3363,12 +3363,12 @@ async function switchRigFromSelect(selectEl) { updateRigSubtitle(lastActiveRigId); if (typeof setSchedulerRig === "function") setSchedulerRig(lastActiveRigId); if (typeof setBackgroundDecodeRig === "function") setBackgroundDecodeRig(lastActiveRigId); - // Also switch the server's active rig so the SSE stream and audio - // follow. Commands already carry rig_id per-tab, but SSE is still - // global until per-session streams are implemented. + // Switch this session's rig and reconnect SSE to the new rig's + // state channel. try { const sidParam = sseSessionId ? `&session_id=${encodeURIComponent(sseSessionId)}` : ""; await postPath(`/select_rig?rig_id=${encodeURIComponent(selectEl.value)}${sidParam}`); + connect(); } catch (err) { console.error("select_rig failed:", err); } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 281eee6..78f21c6 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -335,9 +335,6 @@ pub async fn events( scheduler_control: web::Data, session_rig_mgr: web::Data>, ) -> Result { - let rx = state.get_ref().clone(); - let initial = wait_for_view(rx.clone()).await?; - let counter = clients.get_ref().clone(); let count = counter.fetch_add(1, Ordering::Relaxed) + 1; @@ -352,6 +349,14 @@ pub async fn events( .lock() .ok() .and_then(|g| g.clone()); + + // Subscribe to the per-rig watch channel for this session's rig, + // falling back to the global state watch when unavailable. + let rx = active_rig_id + .as_deref() + .and_then(|rid| context.rig_state_rx(rid)) + .unwrap_or_else(|| state.get_ref().clone()); + let initial = wait_for_view(rx.clone()).await?; if let Some(ref rid) = active_rig_id { session_rig_mgr.register(session_id, rid.clone()); vchan_mgr.init_rig(