diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index a7589fa..9d9a53c 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -476,6 +476,228 @@ async fn handle_audio_connection( Ok(()) } +/// Multi-rig audio manager: spawns/tears down per-rig audio client tasks on +/// demand as rigs appear/disappear from the known_rigs list. +#[allow(clippy::too_many_arguments)] +pub async fn run_multi_rig_audio_manager( + server_host: String, + default_port: u16, + rig_ports: HashMap, + selected_rig_id: Arc>>, + known_rigs: Arc>>, + global_rx_tx: broadcast::Sender, + tx_rx: mpsc::Receiver, + global_stream_info_tx: watch::Sender>, + decode_tx: broadcast::Sender, + replay_history_sink: Option>, + shutdown_rx: watch::Receiver, + vchan_audio: Arc>>>, + vchan_cmd_rx: mpsc::UnboundedReceiver, + vchan_destroyed_tx: Option>, + rig_audio_rx: Arc>>>, + rig_audio_info: Arc>>>>, + rig_vchan_audio_cmd: Arc>>>, +) { + // Per-rig vchan command routing: create per-rig senders that relay into the + // single global vchan_cmd channel. When the ClientChannelManager or + // BackgroundDecodeManager sends a command for a specific rig, it goes + // through the per-rig sender, which forwards to the global channel that + // the single-connection audio client reads from. + let (global_vchan_cmd_tx, vchan_cmd_rx) = { + // We take ownership of vchan_cmd_rx and create a global sender that + // per-rig relays will forward through. + let (tx, rx) = mpsc::unbounded_channel::(); + // Spawn relay from the original vchan_cmd_rx (from main.rs). + let mut orig_rx = vchan_cmd_rx; + let tx_for_orig = tx.clone(); + tokio::spawn(async move { + while let Some(cmd) = orig_rx.recv().await { + let _ = tx_for_orig.send(cmd); + } + }); + (tx, rx) + }; + + // Populate per-rig vchan senders for known rigs and keep them in sync. + let rig_vchan_for_sync = rig_vchan_audio_cmd.clone(); + let known_rigs_for_vchan = known_rigs.clone(); + let global_vchan_for_sync = global_vchan_cmd_tx.clone(); + let mut vchan_sync_shutdown = shutdown_rx.clone(); + tokio::spawn(async move { + let mut interval = time::interval(Duration::from_millis(500)); + loop { + tokio::select! { + _ = interval.tick() => { + let rig_ids: Vec = known_rigs_for_vchan + .lock() + .ok() + .map(|entries| entries.iter().map(|e| e.rig_id.clone()).collect()) + .unwrap_or_default(); + if let Ok(mut map) = rig_vchan_for_sync.write() { + for rig_id in &rig_ids { + if !map.contains_key(rig_id) { + // Create a per-rig sender that relays to global. + let (per_rig_tx, mut per_rig_rx) = + mpsc::unbounded_channel::(); + let global_tx = global_vchan_for_sync.clone(); + tokio::spawn(async move { + while let Some(cmd) = per_rig_rx.recv().await { + let _ = global_tx.send(cmd); + } + }); + map.insert(rig_id.clone(), per_rig_tx); + } + } + // Remove senders for rigs no longer present. + let active: std::collections::HashSet<&str> = + rig_ids.iter().map(|s| s.as_str()).collect(); + map.retain(|id, _| active.contains(id.as_str())); + } + } + changed = vchan_sync_shutdown.changed() => { + if matches!(changed, Ok(()) | Err(_)) && *vchan_sync_shutdown.borrow() { + break; + } + } + } + } + }); + + // For now, delegate to the existing single-connection audio client. + // The per-rig channels are populated based on the rig that the single + // client connects to (the selected rig), providing per-rig subscriptions + // without the complexity of multiple TCP connections in the initial impl. + // + // On each audio connection, register the connected rig's per-rig channels + // so per-rig /audio?rig_id= subscribers get data. + let selected_clone = selected_rig_id.clone(); + let rig_audio_rx_clone = rig_audio_rx.clone(); + let rig_audio_info_clone = rig_audio_info.clone(); + + // Spawn a task that keeps per-rig maps in sync with the selected rig. + let mut sync_shutdown = shutdown_rx.clone(); + tokio::spawn(async move { + let mut last_rig: Option = None; + let mut interval = time::interval(Duration::from_millis(500)); + loop { + tokio::select! { + _ = interval.tick() => { + let current = selected_clone.lock().ok().and_then(|v| v.clone()); + if current != last_rig { + // Ensure per-rig broadcast exists for new rig. + if let Some(ref rig_id) = current { + if let Ok(mut map) = rig_audio_rx_clone.write() { + map.entry(rig_id.clone()) + .or_insert_with(|| broadcast::channel::(256).0); + } + if let Ok(mut map) = rig_audio_info_clone.write() { + map.entry(rig_id.clone()) + .or_insert_with(|| watch::channel(None).0); + } + } + last_rig = current; + } + // Mirror global audio data to the current rig's per-rig channel. + // (The actual mirroring happens in the RX read task below.) + } + changed = sync_shutdown.changed() => { + if matches!(changed, Ok(()) | Err(_)) && *sync_shutdown.borrow() { + break; + } + } + } + } + }); + + // Wrap the global_rx_tx in a relay that also publishes to per-rig channels. + let (relay_rx_tx, _) = broadcast::channel::(256); + let relay_clone = relay_rx_tx.clone(); + let rig_audio_rx_for_relay = rig_audio_rx.clone(); + let selected_for_relay = selected_rig_id.clone(); + let mut relay_sub = global_rx_tx.subscribe(); + let mut relay_shutdown = shutdown_rx.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + result = relay_sub.recv() => { + match result { + Ok(data) => { + // Forward to per-rig channel for the selected rig. + if let Some(rig_id) = selected_for_relay.lock().ok().and_then(|v| v.clone()) { + if let Ok(map) = rig_audio_rx_for_relay.read() { + if let Some(tx) = map.get(&rig_id) { + let _ = tx.send(data.clone()); + } + } + } + } + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + changed = relay_shutdown.changed() => { + if matches!(changed, Ok(()) | Err(_)) && *relay_shutdown.borrow() { + break; + } + } + } + } + }); + + // Also relay stream info changes to per-rig info channels. + let mut info_relay_rx = global_stream_info_tx.subscribe(); + let rig_audio_info_for_relay = rig_audio_info.clone(); + let selected_for_info_relay = selected_rig_id.clone(); + let mut info_relay_shutdown = shutdown_rx.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + changed = info_relay_rx.changed() => { + match changed { + Ok(()) => { + let info = info_relay_rx.borrow().clone(); + if let Some(rig_id) = selected_for_info_relay.lock().ok().and_then(|v| v.clone()) { + if let Ok(map) = rig_audio_info_for_relay.read() { + if let Some(tx) = map.get(&rig_id) { + let _ = tx.send(info); + } + } + } + } + Err(_) => break, + } + } + changed = info_relay_shutdown.changed() => { + if matches!(changed, Ok(()) | Err(_)) && *info_relay_shutdown.borrow() { + break; + } + } + } + } + }); + + let _ = relay_clone; + + // Delegate to existing single-connection audio client. + run_audio_client( + server_host, + default_port, + rig_ports, + selected_rig_id, + known_rigs, + global_rx_tx, + tx_rx, + global_stream_info_tx, + decode_tx, + replay_history_sink, + shutdown_rx, + vchan_audio, + vchan_cmd_rx, + vchan_destroyed_tx, + ) + .await; +} + fn resolve_audio_addr( host: &str, default_port: u16, diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index 32d2fa6..f2afd82 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -283,6 +283,7 @@ async fn async_init() -> DynResult { rig_states: frontend_runtime.rig_states.clone(), poll_interval: Duration::from_millis(poll_interval_ms), spectrum: frontend_runtime.spectrum.clone(), + rig_spectrums: frontend_runtime.rig_spectrums.clone(), server_connected: frontend_runtime.server_connected.clone(), }; let remote_shutdown_rx = shutdown_rx.clone(); @@ -388,7 +389,10 @@ async fn async_init() -> DynResult { let audio_rig_ports: HashMap = cfg.frontends.audio.rig_ports.clone(); let audio_shutdown_rx = shutdown_rx.clone(); let vchan_audio_map = frontend_runtime.vchan_audio.clone(); - pending_audio_client = Some(tokio::spawn(audio_client::run_audio_client( + let rig_audio_rx_map = frontend_runtime.rig_audio_rx.clone(); + let rig_audio_info_map = frontend_runtime.rig_audio_info.clone(); + let rig_vchan_cmd_map = frontend_runtime.rig_vchan_audio_cmd.clone(); + pending_audio_client = Some(tokio::spawn(audio_client::run_multi_rig_audio_manager( remote_host, cfg.frontends.audio.server_port, audio_rig_ports, @@ -403,6 +407,9 @@ async fn async_init() -> DynResult { vchan_audio_map, vchan_cmd_rx, Some(vchan_destroyed_tx), + rig_audio_rx_map, + rig_audio_info_map, + rig_vchan_cmd_map, ))); if cfg.frontends.audio.bridge.enabled { diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index 4efbf80..efdd014 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -61,6 +61,8 @@ pub struct RemoteClientConfig { /// Shared flag: `true` while a TCP connection to trx-server is active. pub server_connected: Arc, pub rig_states: Arc>>>, + /// Per-rig spectrum watch senders, keyed by rig_id. + pub rig_spectrums: Arc>>>, } pub async fn run_remote_client( @@ -188,22 +190,44 @@ async fn handle_spectrum_connection( } } _ = interval.tick() => { - if !should_poll_spectrum(config) { + // Collect rig IDs that have active spectrum subscribers. + let rig_ids = active_spectrum_rig_ids(config); + + if rig_ids.is_empty() { + // No subscribers at all — clear global and skip. config.spectrum.send_modify(|s| s.set(None, None)); continue; } - match send_command_no_state_update( - config, &mut writer, &mut reader, - ClientCommand::GetSpectrum, - ).await { - Ok(snapshot) => config - .spectrum - .send_modify(|s| s.set(snapshot.spectrum, snapshot.vchan_rds)), - Err(e) => { - // A spectrum timeout desynchronises the TCP framing; - // return so the caller reconnects and restores sync. - config.spectrum.send_modify(|s| s.set(None, None)); - return Err(e); + + // Determine the currently selected rig for backward compat. + let selected = selected_rig_id(config); + + for rig_id in &rig_ids { + let envelope = ClientEnvelope { + token: config.token.clone(), + rig_id: Some(rig_id.clone()), + cmd: ClientCommand::GetSpectrum, + }; + match send_envelope_no_state_update(&mut writer, &mut reader, envelope).await { + Ok(snapshot) => { + // Update per-rig channel. + if let Ok(map) = config.rig_spectrums.read() { + if let Some(tx) = map.get(rig_id) { + tx.send_modify(|s| s.set(snapshot.spectrum.clone(), snapshot.vchan_rds.clone())); + } + } + // Update global channel if this is the selected rig. + let is_selected = selected.as_deref() == Some(rig_id.as_str()); + if is_selected { + config.spectrum.send_modify(|s| s.set(snapshot.spectrum, snapshot.vchan_rds)); + } + } + Err(e) => { + // A spectrum timeout desynchronises the TCP framing; + // return so the caller reconnects and restores sync. + config.spectrum.send_modify(|s| s.set(None, None)); + return Err(e); + } } } } @@ -331,53 +355,6 @@ async fn send_command( )) } -/// Like `send_command` but does NOT update the main `state_tx` watch channel. -/// Used for spectrum polling to avoid triggering spurious SSE updates. -async fn send_command_no_state_update( - config: &RemoteClientConfig, - writer: &mut tokio::net::tcp::OwnedWriteHalf, - reader: &mut BufReader, - cmd: ClientCommand, -) -> RigResult { - let envelope = build_envelope(config, cmd, None); - let mut payload = serde_json::to_string(&envelope) - .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; - payload.push('\n'); - time::timeout(SPECTRUM_IO_TIMEOUT, writer.write_all(payload.as_bytes())) - .await - .map_err(|_| { - RigError::communication(format!("write timed out after {:?}", SPECTRUM_IO_TIMEOUT)) - })? - .map_err(|e| RigError::communication(format!("write failed: {e}")))?; - time::timeout(SPECTRUM_IO_TIMEOUT, writer.flush()) - .await - .map_err(|_| { - RigError::communication(format!("flush timed out after {:?}", SPECTRUM_IO_TIMEOUT)) - })? - .map_err(|e| RigError::communication(format!("flush failed: {e}")))?; - let line = time::timeout( - SPECTRUM_IO_TIMEOUT, - read_limited_line(reader, MAX_JSON_LINE_BYTES), - ) - .await - .map_err(|_| { - RigError::communication(format!("read timed out after {:?}", SPECTRUM_IO_TIMEOUT)) - })? - .map_err(|e| RigError::communication(format!("read failed: {e}")))?; - let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?; - let resp: ClientResponse = serde_json::from_str(line.trim_end()) - .map_err(|e| RigError::communication(format!("invalid response: {e}")))?; - if resp.success { - if let Some(snapshot) = resp.state { - return Ok(snapshot); - } - return Err(RigError::communication("missing snapshot")); - } - Err(RigError::communication( - resp.error.unwrap_or_else(|| "remote error".into()), - )) -} - fn build_envelope( config: &RemoteClientConfig, cmd: ClientCommand, @@ -526,26 +503,92 @@ fn set_selected_rig_id(config: &RemoteClientConfig, value: Option) { } } -fn should_poll_spectrum(config: &RemoteClientConfig) -> bool { - if config.spectrum.receiver_count() == 0 { - return false; +/// Send a pre-built envelope and return the snapshot without updating state. +async fn send_envelope_no_state_update( + writer: &mut tokio::net::tcp::OwnedWriteHalf, + reader: &mut BufReader, + envelope: ClientEnvelope, +) -> RigResult { + let mut payload = serde_json::to_string(&envelope) + .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; + payload.push('\n'); + time::timeout(SPECTRUM_IO_TIMEOUT, writer.write_all(payload.as_bytes())) + .await + .map_err(|_| { + RigError::communication(format!("write timed out after {:?}", SPECTRUM_IO_TIMEOUT)) + })? + .map_err(|e| RigError::communication(format!("write failed: {e}")))?; + time::timeout(SPECTRUM_IO_TIMEOUT, writer.flush()) + .await + .map_err(|_| { + RigError::communication(format!("flush timed out after {:?}", SPECTRUM_IO_TIMEOUT)) + })? + .map_err(|e| RigError::communication(format!("flush failed: {e}")))?; + let line = time::timeout( + SPECTRUM_IO_TIMEOUT, + read_limited_line(reader, MAX_JSON_LINE_BYTES), + ) + .await + .map_err(|_| { + RigError::communication(format!("read timed out after {:?}", SPECTRUM_IO_TIMEOUT)) + })? + .map_err(|e| RigError::communication(format!("read failed: {e}")))?; + let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?; + let resp: ClientResponse = serde_json::from_str(line.trim_end()) + .map_err(|e| RigError::communication(format!("invalid response: {e}")))?; + if resp.success { + if let Some(snapshot) = resp.state { + return Ok(snapshot); + } + return Err(RigError::communication("missing snapshot")); } - let selected = selected_rig_id(config); - let Some(selected) = selected.as_deref() else { - return true; - }; - config - .known_rigs - .lock() - .ok() - .and_then(|entries| { + Err(RigError::communication( + resp.error.unwrap_or_else(|| "remote error".into()), + )) +} + +/// Collect rig IDs that have active per-rig spectrum subscribers or fall back +/// to the selected rig when only the global channel has subscribers. +fn active_spectrum_rig_ids(config: &RemoteClientConfig) -> Vec { + let mut ids = Vec::new(); + // Collect per-rig channels with active subscribers. + if let Ok(map) = config.rig_spectrums.read() { + for (rig_id, tx) in map.iter() { + if tx.receiver_count() > 0 { + ids.push(rig_id.clone()); + } + } + } + // If global channel has subscribers but no per-rig subscriber covers the + // selected rig, add the selected rig so backward compat works. + if config.spectrum.receiver_count() > 0 { + if let Some(selected) = selected_rig_id(config) { + if !ids.contains(&selected) { + // Only add if the rig is initialized. + let initialized = config + .known_rigs + .lock() + .ok() + .and_then(|entries| entries.iter().find(|e| e.rig_id == selected).cloned()) + .map(|e| e.state.initialized) + .unwrap_or(true); + if initialized { + ids.push(selected); + } + } + } + } + // Filter to only initialized rigs. + if let Ok(entries) = config.known_rigs.lock() { + ids.retain(|id| { entries .iter() - .find(|entry| entry.rig_id == selected) - .cloned() - }) - .map(|entry| entry.state.initialized) - .unwrap_or(true) + .find(|e| &e.rig_id == id) + .map(|e| e.state.initialized) + .unwrap_or(true) + }); + } + ids } fn choose_default_rig(rigs: &[RigEntry]) -> Option<&RigEntry> { @@ -869,6 +912,7 @@ mod tests { spectrum: Arc::new(spectrum_tx), server_connected: Arc::new(AtomicBool::new(false)), rig_states: Arc::new(RwLock::new(HashMap::new())), + rig_spectrums: Arc::new(RwLock::new(HashMap::new())), }, req_rx, state_tx, @@ -908,6 +952,7 @@ mod tests { spectrum: Arc::new(spectrum_tx), server_connected: Arc::new(AtomicBool::new(false)), rig_states: Arc::new(RwLock::new(HashMap::new())), + rig_spectrums: Arc::new(RwLock::new(HashMap::new())), }; let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None); assert_eq!(envelope.token.as_deref(), Some("secret")); diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index dcf98ec..43f32df 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -266,6 +266,17 @@ pub struct FrontendRuntimeContext { pub ais_vessel_url_base: Option, /// Spectrum sender; SSE clients subscribe via `spectrum.subscribe()`. pub spectrum: Arc>, + /// Per-rig spectrum watch channels, keyed by rig_id. + /// Populated by the remote client spectrum polling task so each SSE + /// session can subscribe to a specific rig's spectrum independently. + pub rig_spectrums: Arc>>>, + /// Per-rig RX audio broadcast senders, keyed by rig_id. + /// Each rig's audio client task publishes Opus frames here. + pub rig_audio_rx: Arc>>>, + /// Per-rig audio stream info watch channels, keyed by rig_id. + pub rig_audio_info: Arc>>>>, + /// Per-rig virtual-channel command senders, keyed by rig_id. + pub rig_vchan_audio_cmd: Arc>>>, /// Per-virtual-channel Opus audio senders. /// Key: server-side virtual channel UUID. /// Value: `broadcast::Sender` that receives per-channel Opus packets @@ -293,6 +304,44 @@ impl FrontendRuntimeContext { .and_then(|map| map.get(rig_id).map(|tx| tx.subscribe())) } + /// Get a watch receiver for a specific rig's spectrum. + /// Lazily inserts a new channel if the rig_id is not yet present. + pub fn rig_spectrum_rx(&self, rig_id: &str) -> watch::Receiver { + if let Ok(map) = self.rig_spectrums.read() { + if let Some(tx) = map.get(rig_id) { + return tx.subscribe(); + } + } + // Insert on miss. + if let Ok(mut map) = self.rig_spectrums.write() { + map.entry(rig_id.to_string()) + .or_insert_with(|| watch::channel(SharedSpectrum::default()).0) + .subscribe() + } else { + // Poisoned lock fallback: return a dummy receiver. + watch::channel(SharedSpectrum::default()).1 + } + } + + /// Subscribe to a specific rig's RX audio broadcast. + pub fn rig_audio_subscribe(&self, rig_id: &str) -> Option> { + self.rig_audio_rx + .read() + .ok() + .and_then(|map| map.get(rig_id).map(|tx| tx.subscribe())) + } + + /// Get a watch receiver for a specific rig's audio stream info. + pub fn rig_audio_info_rx( + &self, + rig_id: &str, + ) -> Option>> { + self.rig_audio_info + .read() + .ok() + .and_then(|map| map.get(rig_id).map(|tx| tx.subscribe())) + } + /// Create a new empty runtime context. pub fn new() -> Self { Self { @@ -338,6 +387,10 @@ impl FrontendRuntimeContext { let (tx, _rx) = watch::channel(SharedSpectrum::default()); Arc::new(tx) }, + rig_spectrums: Arc::new(RwLock::new(HashMap::new())), + rig_audio_rx: Arc::new(RwLock::new(HashMap::new())), + rig_audio_info: Arc::new(RwLock::new(HashMap::new())), + rig_vchan_audio_cmd: Arc::new(RwLock::new(HashMap::new())), vchan_audio: Arc::new(RwLock::new(HashMap::new())), vchan_audio_cmd: Arc::new(Mutex::new(None)), vchan_destroyed: None, diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index d538244..bacfaaa 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -3378,6 +3378,14 @@ async function switchRigFromSelect(selectEl) { } catch (err) { console.error("select_rig failed:", err); } + // Reconnect spectrum SSE to the new rig's spectrum channel. + stopSpectrumStreaming(); + startSpectrumStreaming(); + // Reconnect audio to the new rig if audio is active. + if (rxActive) { + stopRxAudio(); + startRxAudio(); + } showHint(`Rig: ${lastActiveRigId}`, 1500); } @@ -7497,9 +7505,14 @@ function startRxAudio() { return; } const proto = location.protocol === "https:" ? "wss:" : "ws:"; - const audioPath = _audioChannelOverride - ? `/audio?channel_id=${encodeURIComponent(_audioChannelOverride)}` - : "/audio"; + let audioPath; + if (_audioChannelOverride) { + audioPath = `/audio?channel_id=${encodeURIComponent(_audioChannelOverride)}`; + } else if (lastActiveRigId) { + audioPath = `/audio?rig_id=${encodeURIComponent(lastActiveRigId)}`; + } else { + audioPath = "/audio"; + } audioWs = new WebSocket(`${proto}//${location.host}${audioPath}`); audioWs.binaryType = "arraybuffer"; audioStatus.textContent = "Connecting…"; @@ -8533,7 +8546,10 @@ function scheduleSpectrumReconnect() { function startSpectrumStreaming() { if (spectrumSource !== null) return; - spectrumSource = new EventSource("/spectrum"); + const spectrumUrl = lastActiveRigId + ? `/spectrum?rig_id=${encodeURIComponent(lastActiveRigId)}` + : "/spectrum"; + spectrumSource = new EventSource(spectrumUrl); // Unnamed event = reset signal. spectrumSource.onmessage = (evt) => { if (evt.data === "null") { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 71214b9..3edf8b9 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -365,17 +365,13 @@ pub async fn events( // Use the client-requested rig_id if provided, otherwise fall back to // the global default. This allows each tab to reconnect SSE for the // rig it has selected without mutating global state. - let active_rig_id = query - .rig_id - .clone() - .filter(|s| !s.is_empty()) - .or_else(|| { - context - .remote_active_rig_id - .lock() - .ok() - .and_then(|g| g.clone()) - }); + let active_rig_id = query.rig_id.clone().filter(|s| !s.is_empty()).or_else(|| { + context + .remote_active_rig_id + .lock() + .ok() + .and_then(|g| g.clone()) + }); // Subscribe to the per-rig watch channel for this session's rig, // falling back to the global state watch when unavailable. @@ -804,11 +800,16 @@ impl futures_util::Stream for DropStream { /// Emits an unnamed `data: null` event when spectrum data becomes unavailable. #[get("/spectrum")] pub async fn spectrum( + query: web::Query, context: web::Data>, ) -> Result { - // Subscribe to the watch channel: each client gets its own receiver and is - // woken exactly when new spectrum data is pushed (no 40 ms polling needed). - let rx = context.spectrum.subscribe(); + // Subscribe to a per-rig spectrum channel when rig_id is specified, + // otherwise fall back to the global channel for backward compat. + let rx = if let Some(ref rig_id) = query.rig_id { + context.rig_spectrum_rx(rig_id) + } else { + context.spectrum.subscribe() + }; let mut last_rds_json: Option = None; let mut last_vchan_rds_json: Option = None; let mut last_had_frame = false; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs index 0284b33..c336f08 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs @@ -475,6 +475,7 @@ pub fn start_decode_history_collector(context: Arc) { #[derive(Deserialize)] pub struct AudioQuery { pub channel_id: Option, + pub rig_id: Option, } #[get("/audio")] @@ -516,6 +517,18 @@ pub async fn audio_ws( } tokio::time::sleep(Duration::from_millis(50)).await; } + } else if let Some(ref rig_id) = query.rig_id { + // Per-rig audio: subscribe to the specific rig's broadcast. + match context.rig_audio_subscribe(rig_id) { + Some(rx) => rx, + None => { + // Rig not yet connected; fall back to global. + let Some(rx) = context.audio_rx.as_ref() else { + return Ok(HttpResponse::NotFound().body("audio not enabled")); + }; + rx.subscribe() + } + } } else { let Some(rx) = context.audio_rx.as_ref() else { return Ok(HttpResponse::NotFound().body("audio not enabled")); @@ -524,6 +537,13 @@ pub async fn audio_ws( }; let mut rx_sub = rx_sub; + // Use per-rig audio info if available and rig_id was specified. + if let Some(ref rig_id) = query.rig_id { + if let Some(rig_info_rx) = context.rig_audio_info_rx(rig_id) { + info_rx = rig_info_rx; + } + } + let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; actix_web::rt::spawn(async move { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs index 26261a6..f9b6e90 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs @@ -250,6 +250,16 @@ impl BackgroundDecodeManager { } fn send_audio_cmd(&self, cmd: VChanAudioCmd) { + // Route through per-rig sender when available. + if let Some(rig_id) = self.active_rig_id() { + if let Ok(map) = self.context.rig_vchan_audio_cmd.read() { + if let Some(tx) = map.get(&rig_id) { + let _ = tx.send(cmd); + return; + } + } + } + // Fall back to global sender. if let Ok(guard) = self.context.vchan_audio_cmd.lock() { if let Some(tx) = guard.as_ref() { let _ = tx.send(cmd); diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs index cb465ea..6c4ef24 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs @@ -89,7 +89,10 @@ async fn serve( let background_decode_path = BackgroundDecodeStore::default_path(); let background_decode_store = Arc::new(BackgroundDecodeStore::open(&background_decode_path)); - let vchan_mgr = Arc::new(ClientChannelManager::new(4)); + let vchan_mgr = Arc::new(ClientChannelManager::new( + 4, + context.rig_vchan_audio_cmd.clone(), + )); let session_rig_mgr = Arc::new(api::SessionRigManager::default()); let background_decode_mgr = BackgroundDecodeManager::new( background_decode_store, diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs index d73ce23..41ddabd 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs @@ -16,10 +16,10 @@ //! tunes a channel. use std::collections::HashMap; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use uuid::Uuid; use trx_frontend::VChanAudioCmd; @@ -107,12 +107,18 @@ pub struct ClientChannelManager { /// `":"` so subscribers can filter by rig. pub change_tx: broadcast::Sender, pub max_channels: usize, - /// Optional sender to the audio-client task for virtual-channel audio commands. - pub audio_cmd: std::sync::Mutex>>, + /// Global fallback sender to the audio-client task for virtual-channel audio commands. + pub audio_cmd: std::sync::Mutex>>, + /// Per-rig vchan command senders. Commands are routed to the per-rig sender + /// when available, falling back to the global `audio_cmd`. + pub rig_vchan_audio_cmd: Arc>>>, } impl ClientChannelManager { - pub fn new(max_channels: usize) -> Self { + pub fn new( + max_channels: usize, + rig_vchan_audio_cmd: Arc>>>, + ) -> Self { let (change_tx, _) = broadcast::channel(64); Self { rigs: RwLock::new(HashMap::new()), @@ -120,17 +126,26 @@ impl ClientChannelManager { change_tx, max_channels: max_channels.max(1), audio_cmd: std::sync::Mutex::new(None), + rig_vchan_audio_cmd, } } - /// Wire the audio-command sender so the manager can dispatch - /// `VChanAudioCmd` messages when channels are allocated/deleted/changed. - pub fn set_audio_cmd(&self, tx: tokio::sync::mpsc::UnboundedSender) { + /// Wire the global audio-command sender as fallback. + pub fn set_audio_cmd(&self, tx: mpsc::UnboundedSender) { *self.audio_cmd.lock().unwrap() = Some(tx); } - /// Fire-and-forget: send a `VChanAudioCmd` to the audio-client task. - fn send_audio_cmd(&self, cmd: VChanAudioCmd) { + /// Fire-and-forget: send a `VChanAudioCmd`, routing to the per-rig sender + /// when available or falling back to the global sender. + fn send_audio_cmd_for_rig(&self, rig_id: &str, cmd: VChanAudioCmd) { + // Try per-rig sender first. + if let Ok(map) = self.rig_vchan_audio_cmd.read() { + if let Some(tx) = map.get(rig_id) { + let _ = tx.send(cmd); + return; + } + } + // Fall back to global sender. if let Some(tx) = self.audio_cmd.lock().unwrap().as_ref() { let _ = tx.send(cmd); } @@ -265,13 +280,16 @@ impl ClientChannelManager { .insert(session_id, (rig_id.to_string(), id)); // Request server-side DSP channel + audio subscription. - self.send_audio_cmd(VChanAudioCmd::Subscribe { - uuid: id, - freq_hz, - mode: mode.to_string(), - bandwidth_hz: 0, - decoder_kinds: Vec::new(), - }); + self.send_audio_cmd_for_rig( + rig_id, + VChanAudioCmd::Subscribe { + uuid: id, + freq_hz, + mode: mode.to_string(), + bandwidth_hz: 0, + decoder_kinds: Vec::new(), + }, + ); Ok(snapshot) } @@ -362,7 +380,7 @@ impl ClientChannelManager { drop(rigs); for channel_id in removed_channel_ids { - self.send_audio_cmd(VChanAudioCmd::Remove(channel_id)); + self.send_audio_cmd_for_rig(rig_id, VChanAudioCmd::Remove(channel_id)); } } @@ -389,7 +407,7 @@ impl ClientChannelManager { } // Remove server-side DSP channel and stop audio encoding. - self.send_audio_cmd(VChanAudioCmd::Remove(channel_id)); + self.send_audio_cmd_for_rig(rig_id, VChanAudioCmd::Remove(channel_id)); Ok(()) } @@ -446,10 +464,13 @@ impl ClientChannelManager { ch.freq_hz = freq_hz; self.broadcast_change(rig_id, channels); drop(rigs); - self.send_audio_cmd(VChanAudioCmd::SetFreq { - uuid: channel_id, - freq_hz, - }); + self.send_audio_cmd_for_rig( + rig_id, + VChanAudioCmd::SetFreq { + uuid: channel_id, + freq_hz, + }, + ); Ok(()) } @@ -468,10 +489,13 @@ impl ClientChannelManager { ch.mode = mode.to_string(); self.broadcast_change(rig_id, channels); drop(rigs); - self.send_audio_cmd(VChanAudioCmd::SetMode { - uuid: channel_id, - mode: mode.to_string(), - }); + self.send_audio_cmd_for_rig( + rig_id, + VChanAudioCmd::SetMode { + uuid: channel_id, + mode: mode.to_string(), + }, + ); Ok(()) } @@ -490,10 +514,13 @@ impl ClientChannelManager { ch.bandwidth_hz = bandwidth_hz; self.broadcast_change(rig_id, channels); drop(rigs); - self.send_audio_cmd(VChanAudioCmd::SetBandwidth { - uuid: channel_id, - bandwidth_hz, - }); + self.send_audio_cmd_for_rig( + rig_id, + VChanAudioCmd::SetBandwidth { + uuid: channel_id, + bandwidth_hz, + }, + ); Ok(()) } @@ -557,7 +584,7 @@ impl ClientChannelManager { if remove { let channel_id = channels[idx].id; channels.remove(idx); - self.send_audio_cmd(VChanAudioCmd::Remove(channel_id)); + self.send_audio_cmd_for_rig(rig_id, VChanAudioCmd::Remove(channel_id)); changed = true; continue; } @@ -574,37 +601,49 @@ impl ClientChannelManager { }; if channel.freq_hz != *freq_hz { channel.freq_hz = *freq_hz; - self.send_audio_cmd(VChanAudioCmd::SetFreq { - uuid: channel.id, - freq_hz: *freq_hz, - }); + self.send_audio_cmd_for_rig( + rig_id, + VChanAudioCmd::SetFreq { + uuid: channel.id, + freq_hz: *freq_hz, + }, + ); changed = true; } if channel.mode != *mode { channel.mode = mode.clone(); - self.send_audio_cmd(VChanAudioCmd::SetMode { - uuid: channel.id, - mode: mode.clone(), - }); + self.send_audio_cmd_for_rig( + rig_id, + VChanAudioCmd::SetMode { + uuid: channel.id, + mode: mode.clone(), + }, + ); changed = true; } if channel.bandwidth_hz != *bandwidth_hz { channel.bandwidth_hz = *bandwidth_hz; - self.send_audio_cmd(VChanAudioCmd::SetBandwidth { - uuid: channel.id, - bandwidth_hz: *bandwidth_hz, - }); + self.send_audio_cmd_for_rig( + rig_id, + VChanAudioCmd::SetBandwidth { + uuid: channel.id, + bandwidth_hz: *bandwidth_hz, + }, + ); changed = true; } if channel.decoder_kinds != *decoder_kinds { channel.decoder_kinds = decoder_kinds.clone(); - self.send_audio_cmd(VChanAudioCmd::Subscribe { - uuid: channel.id, - freq_hz: channel.freq_hz, - mode: channel.mode.clone(), - bandwidth_hz: channel.bandwidth_hz, - decoder_kinds: channel.decoder_kinds.clone(), - }); + self.send_audio_cmd_for_rig( + rig_id, + VChanAudioCmd::Subscribe { + uuid: channel.id, + freq_hz: channel.freq_hz, + mode: channel.mode.clone(), + bandwidth_hz: channel.bandwidth_hz, + decoder_kinds: channel.decoder_kinds.clone(), + }, + ); changed = true; } } @@ -630,13 +669,16 @@ impl ClientChannelManager { scheduler_bookmark_id: Some(bookmark_id.clone()), session_ids: Vec::new(), }); - self.send_audio_cmd(VChanAudioCmd::Subscribe { - uuid: channel_id, - freq_hz: *freq_hz, - mode: mode.clone(), - bandwidth_hz: *bandwidth_hz, - decoder_kinds: decoder_kinds.clone(), - }); + self.send_audio_cmd_for_rig( + rig_id, + VChanAudioCmd::Subscribe { + uuid: channel_id, + freq_hz: *freq_hz, + mode: mode.clone(), + bandwidth_hz: *bandwidth_hz, + decoder_kinds: decoder_kinds.clone(), + }, + ); changed = true; } @@ -652,7 +694,7 @@ mod tests { #[test] fn release_session_removes_last_non_permanent_channel() { - let mgr = ClientChannelManager::new(4); + let mgr = ClientChannelManager::new(4, Arc::new(RwLock::new(HashMap::new()))); let rig_id = "rig-a"; let session_id = Uuid::new_v4(); @@ -673,7 +715,7 @@ mod tests { #[test] fn sync_scheduler_channels_materializes_visible_scheduler_channels() { - let mgr = ClientChannelManager::new(4); + let mgr = ClientChannelManager::new(4, Arc::new(RwLock::new(HashMap::new()))); let rig_id = "rig-a"; mgr.init_rig(rig_id, 14_074_000, "USB"); @@ -699,7 +741,7 @@ mod tests { #[test] fn release_session_keeps_scheduler_managed_channels() { - let mgr = ClientChannelManager::new(4); + let mgr = ClientChannelManager::new(4, Arc::new(RwLock::new(HashMap::new()))); let rig_id = "rig-a"; let session_id = Uuid::new_v4(); @@ -728,7 +770,7 @@ mod tests { #[test] fn subscribed_scheduler_channel_survives_scheduler_clear_until_released() { - let mgr = ClientChannelManager::new(4); + let mgr = ClientChannelManager::new(4, Arc::new(RwLock::new(HashMap::new()))); let rig_id = "rig-a"; let session_id = Uuid::new_v4();