From 60267d450bb1b877eb2afb54fbbd0ab6054d345f Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Wed, 11 Mar 2026 20:50:49 +0100 Subject: [PATCH] [feat](trx-rs): persistent multi-channel virtual channels with OOB eviction Allow users to allocate multiple virtual channels independently of browser tab count. Channels survive SDR center-frequency retuning as long as they stay within the capture bandwidth; channels that fall outside the SDR span are automatically destroyed. Changes: - trx-core: add AUDIO_MSG_VCHAN_DESTROYED (0x12) wire constant; add default subscribe_destroyed() to VirtualChannelManager trait - trx-backend-soapysdr: update_center_hz() detects OOB channels, removes them, fires destroyed_tx broadcast; add destroyed_sender() and subscribe_destroyed() override - trx-server/audio: recv_destroyed() helper avoids select! busy-loop for non-SDR backends; send AUDIO_MSG_VCHAN_DESTROYED to client when a channel is evicted server-side - trx-client/audio_client: persist active_subs across TCP reconnects, re-subscribe on reconnect; handle AUDIO_MSG_VCHAN_DESTROYED by pruning vchan_audio map and forwarding UUID via vchan_destroyed_tx - trx-frontend/lib: add vchan_destroyed broadcast field to FrontendRuntimeContext - trx-client/main: wire vchan_destroyed_tx into audio client and frontend runtime context - trx-frontend-http/vchan: remove per-session one-channel limit in allocate(); replace auto-evict in release_session_on_rig() with subscriber-count-only update; add remove_by_uuid() for server- triggered OOB destruction (skips redundant VChanAudioCmd::Remove) - trx-frontend-http/server: spawn background task that forwards vchan_destroyed broadcast to ClientChannelManager.remove_by_uuid() Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- src/trx-client/src/audio_client.rs | 74 +++++++++++++++---- src/trx-client/src/main.rs | 4 + src/trx-client/trx-frontend/src/lib.rs | 5 ++ .../trx-frontend-http/src/server.rs | 20 ++++- .../trx-frontend-http/src/vchan.rs | 57 ++++++++------ src/trx-core/src/audio.rs | 3 + src/trx-core/src/vchan.rs | 12 +++ src/trx-server/src/audio.rs | 42 ++++++++++- .../trx-backend-soapysdr/src/vchan_impl.rs | 43 ++++++++--- 9 files changed, 215 insertions(+), 45 deletions(-) diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index 54bd140..0a3af41 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -5,7 +5,7 @@ //! Audio TCP client that connects to the server's audio port and relays //! RX/TX Opus frames via broadcast/mpsc channels. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; @@ -26,9 +26,9 @@ use trx_core::audio::{ write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_RX_FRAME_CH, - AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_FREQ, - AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, - AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, + AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_DESTROYED, + AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, + AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, }; use trx_core::decode::DecodedMessage; use trx_frontend::VChanAudioCmd; @@ -48,8 +48,12 @@ pub async fn run_audio_client( mut shutdown_rx: watch::Receiver, vchan_audio: Arc>>>, mut vchan_cmd_rx: mpsc::Receiver, + vchan_destroyed_tx: Option>, ) { let mut reconnect_delay = Duration::from_secs(1); + // Active virtual-channel subscriptions, keyed by UUID. + // Re-sent to the server on every audio TCP reconnect. + let mut active_subs: HashMap = HashMap::new(); loop { if *shutdown_rx.borrow() { @@ -87,6 +91,8 @@ pub async fn run_audio_client( &mut shutdown_rx, &vchan_audio, &mut vchan_cmd_rx, + &mut active_subs, + &vchan_destroyed_tx, ) .await { @@ -132,6 +138,8 @@ async fn handle_audio_connection( shutdown_rx: &mut watch::Receiver, vchan_audio: &Arc>>>, vchan_cmd_rx: &mut mpsc::Receiver, + active_subs: &mut HashMap, + vchan_destroyed_tx: &Option>, ) -> std::io::Result<()> { let (reader, writer) = stream.into_split(); let mut reader = BufReader::new(reader); @@ -153,10 +161,30 @@ async fn handle_audio_connection( ); let _ = stream_info_tx.send(Some(info)); + // On reconnect: re-subscribe all previously active virtual channels. + // Track which UUIDs were pre-sent so we don't duplicate them when the + // same Subscribe command arrives from the mpsc queue. + let mut resubscribed: HashSet = HashSet::new(); + for (&uuid, &(freq_hz, ref mode)) in active_subs.iter() { + let json = serde_json::json!({ + "uuid": uuid.to_string(), + "freq_hz": freq_hz, + "mode": mode, + }); + if let Ok(payload) = serde_json::to_vec(&json) { + if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await { + warn!("Audio vchan reconnect SUB write failed: {}", e); + return Err(e); + } + } + resubscribed.insert(uuid); + } + // Spawn RX read task let rx_tx = rx_tx.clone(); let decode_tx = decode_tx.clone(); let vchan_audio_rx: Arc>>> = Arc::clone(vchan_audio); + let vchan_destroyed_for_rx = vchan_destroyed_tx.clone(); let mut rx_handle = tokio::spawn(async move { loop { match read_audio_msg(&mut reader).await { @@ -184,6 +212,19 @@ async fn handle_audio_connection( } } } + Ok((AUDIO_MSG_VCHAN_DESTROYED, payload)) => { + if let Ok(uuid) = parse_vchan_uuid_msg(&payload) { + // Remove the broadcaster so audio_ws gets no more frames. + if let Ok(mut map) = vchan_audio_rx.write() { + map.remove(&uuid); + } + // Notify the HTTP frontend so it removes the channel from + // ClientChannelManager (triggers SSE channels event). + if let Some(ref tx) = vchan_destroyed_for_rx { + let _ = tx.send(uuid); + } + } + } Ok((AUDIO_MSG_HISTORY_COMPRESSED, payload)) => { // Decompress gzip blob, then iterate the embedded framed messages. let mut decompressed = Vec::new(); @@ -265,15 +306,21 @@ async fn handle_audio_connection( cmd = vchan_cmd_rx.recv() => { match cmd { Some(VChanAudioCmd::Subscribe { uuid, freq_hz, mode }) => { - let json = serde_json::json!({ - "uuid": uuid.to_string(), - "freq_hz": freq_hz, - "mode": mode, - }); - if let Ok(payload) = serde_json::to_vec(&json) { - if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await { - warn!("Audio vchan SUB write failed: {}", e); - break; + active_subs.insert(uuid, (freq_hz, mode.clone())); + // Skip if already re-sent during reconnect initialization. + if resubscribed.remove(&uuid) { + // Already sent above; don't duplicate. + } else { + let json = serde_json::json!({ + "uuid": uuid.to_string(), + "freq_hz": freq_hz, + "mode": mode, + }); + if let Ok(payload) = serde_json::to_vec(&json) { + if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await { + warn!("Audio vchan SUB write failed: {}", e); + break; + } } } } @@ -292,6 +339,7 @@ async fn handle_audio_connection( if let Ok(mut map) = vchan_audio.write() { map.remove(&uuid); } + active_subs.remove(&uuid); } Some(VChanAudioCmd::SetFreq { uuid, freq_hz }) => { let json = serde_json::json!({ "uuid": uuid.to_string(), "freq_hz": freq_hz }); diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index 70cf641..4db46e4 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -303,6 +303,9 @@ async fn async_init() -> DynResult { let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::channel::(64); *frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx); + let (vchan_destroyed_tx, _) = broadcast::channel::(64); + frontend_runtime.vchan_destroyed = Some(vchan_destroyed_tx.clone()); + info!( "Audio enabled: default port {}, decode channel set", cfg.frontends.audio.server_port @@ -324,6 +327,7 @@ async fn async_init() -> DynResult { audio_shutdown_rx, vchan_audio_map, vchan_cmd_rx, + Some(vchan_destroyed_tx), ))); if cfg.frontends.audio.bridge.enabled { diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index 3281971..7e61502 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -236,6 +236,10 @@ pub struct FrontendRuntimeContext { /// forwards `VCHAN_SUB` / `VCHAN_UNSUB` frames over the audio TCP connection. /// `None` when no audio connection is active. pub vchan_audio_cmd: Arc>>>, + /// Broadcast sender that fires whenever the server destroys a virtual + /// channel (e.g. out-of-bandwidth after center-frequency retune). + /// The HTTP frontend subscribes to clean up `ClientChannelManager`. + pub vchan_destroyed: Option>, } impl FrontendRuntimeContext { @@ -281,6 +285,7 @@ impl FrontendRuntimeContext { }, vchan_audio: Arc::new(RwLock::new(HashMap::new())), vchan_audio_cmd: Arc::new(Mutex::new(None)), + vchan_destroyed: None, } } } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs index 96d767e..fff8620 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs @@ -28,7 +28,7 @@ use actix_web::{ web, App, HttpServer, }; use tokio::signal; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{broadcast, mpsc, watch}; use tokio::task::JoinHandle; use tracing::{error, info}; @@ -92,6 +92,24 @@ async fn serve( } } + // Spawn a task that removes channels destroyed server-side (OOB) from the + // client-side registry so the SSE channel list stays in sync. + if let Some(ref destroyed_tx) = context.vchan_destroyed { + let mut destroyed_rx = destroyed_tx.subscribe(); + let mgr_for_destroyed = vchan_mgr.clone(); + tokio::spawn(async move { + loop { + match destroyed_rx.recv().await { + Ok(uuid) => { + mgr_for_destroyed.remove_by_uuid(uuid); + } + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + }); + } + let server = build_server(addr, state_rx, rig_tx, callsign, context, scheduler_store, scheduler_status, vchan_mgr)?; let handle = server.handle(); tokio::spawn(async move { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs index 09bcbfa..69f0821 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs @@ -203,9 +203,6 @@ impl ClientChannelManager { freq_hz: u64, mode: &str, ) -> Result { - // Release any existing channel owned by this session on this rig. - self.release_session_on_rig(session_id, rig_id); - let mut rigs = self.rigs.write().unwrap(); let channels = rigs.entry(rig_id.to_string()).or_default(); @@ -318,27 +315,9 @@ impl ClientChannelManager { changed = true; } } - // Collect IDs of non-permanent channels about to be evicted (0 subscribers). - let to_remove: Vec = channels - .iter() - .filter(|c| !c.permanent && c.session_ids.is_empty()) - .map(|c| c.id) - .collect(); - // Remove non-permanent channels with no subscribers. - let before = channels.len(); - channels.retain(|c| c.permanent || !c.session_ids.is_empty()); - if channels.len() != before { - changed = true; - } if changed { self.broadcast_change(rig_id, channels); } - drop(rigs); - // Notify the audio-client task so it can tear down the server-side - // DSP pipeline and Opus encoder for each evicted channel. - for id in to_remove { - self.send_audio_cmd(VChanAudioCmd::Remove(id)); - } } /// Explicitly delete a channel by UUID (any session may do this). @@ -373,6 +352,42 @@ impl ClientChannelManager { Ok(()) } + /// Remove a channel by UUID across all rigs (called when the server destroys + /// it due to out-of-band center-frequency change). Does NOT send a + /// `VChanAudioCmd::Remove` since the server-side channel is already gone. + pub fn remove_by_uuid(&self, channel_id: Uuid) { + let evicted_sessions: Vec; + let rig_id_opt: Option; + { + let mut rigs = self.rigs.write().unwrap(); + let mut found = false; + let mut evicted = Vec::new(); + let mut found_rig = None; + for (rig_id, channels) in rigs.iter_mut() { + if let Some(pos) = channels.iter().position(|c| c.id == channel_id) { + evicted = channels[pos].session_ids.clone(); + channels.remove(pos); + self.broadcast_change(rig_id, channels); + found_rig = Some(rig_id.clone()); + found = true; + break; + } + } + evicted_sessions = evicted; + rig_id_opt = found_rig; + let _ = found; // suppress warning + } + // Clean up session → channel mapping for sessions that were subscribed. + if rig_id_opt.is_some() { + let mut sessions = self.sessions.write().unwrap(); + for sid in evicted_sessions { + if matches!(sessions.get(&sid), Some((_, ch)) if *ch == channel_id) { + sessions.remove(&sid); + } + } + } + } + /// Update freq/mode metadata for a channel. pub fn set_channel_freq( &self, diff --git a/src/trx-core/src/audio.rs b/src/trx-core/src/audio.rs index 3e45b0a..da9bf70 100644 --- a/src/trx-core/src/audio.rs +++ b/src/trx-core/src/audio.rs @@ -56,6 +56,9 @@ pub const AUDIO_MSG_VCHAN_MODE: u8 = 0x10; /// Client → server: remove a virtual channel (stops encoding and destroys the DSP pipeline). /// Payload: 16-byte UUID of the virtual channel on the server. pub const AUDIO_MSG_VCHAN_REMOVE: u8 = 0x11; +/// Server → client: a virtual channel was destroyed server-side (e.g. went out of bandwidth). +/// Payload: 16-byte UUID of the destroyed channel. +pub const AUDIO_MSG_VCHAN_DESTROYED: u8 = 0x12; /// Maximum payload size for normal messages (1 MB). const MAX_PAYLOAD_SIZE: u32 = 1_048_576; diff --git a/src/trx-core/src/vchan.rs b/src/trx-core/src/vchan.rs index d1161fb..536f056 100644 --- a/src/trx-core/src/vchan.rs +++ b/src/trx-core/src/vchan.rs @@ -117,6 +117,18 @@ pub trait VirtualChannelManager: Send + Sync { /// Maximum number of channels (including the primary channel). fn max_channels(&self) -> usize; + + /// Subscribe to server-side channel destruction events. + /// + /// Returns a `broadcast::Receiver` that fires whenever the manager + /// destroys a channel (e.g. because it went out of the SDR capture + /// bandwidth). The default implementation returns an immediately-closed + /// receiver so non-SDR backends do not need to override this. + fn subscribe_destroyed(&self) -> broadcast::Receiver { + // Drop the sender immediately; the receiver will resolve to + // `Err(RecvError::Closed)` on first poll, signalling "no events". + broadcast::channel::(1).1 + } } /// Convenience alias used in `RigHandle`. diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 3ce016d..c4f7a37 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -27,7 +27,7 @@ use trx_core::audio::{ AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, - AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, + AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, }; use trx_core::vchan::SharedVChanManager; @@ -1823,6 +1823,23 @@ pub async fn run_audio_listener( Ok(()) } +/// Returns the next destroyed-channel UUID, or `pending()` when the receiver +/// has been closed or is not present. Disables itself on close so the +/// enclosing `select!` never busy-loops on a dead channel. +async fn recv_destroyed(rx: &mut Option>) -> Option { + match rx { + None => std::future::pending::>().await, + Some(r) => match r.recv().await { + Ok(uuid) => Some(uuid), + Err(broadcast::error::RecvError::Lagged(_)) => None, + Err(broadcast::error::RecvError::Closed) => { + *rx = None; + std::future::pending::>().await + } + }, + } +} + #[allow(clippy::too_many_arguments)] async fn handle_audio_client( socket: TcpStream, @@ -1921,6 +1938,10 @@ async fn handle_audio_client( let opus_sample_rate = stream_info.sample_rate; let opus_channels = stream_info.channels; + // Subscribe to server-side channel destruction events (SDR rigs only). + let mut destroyed_rx: Option> = + vchan_manager.as_ref().map(|m| m.subscribe_destroyed()); + let rx_handle = tokio::spawn(async move { // UUID → JoinHandle of per-channel Opus encoder task. let mut vchan_tasks: std::collections::HashMap> = @@ -2047,6 +2068,25 @@ async fn handle_audio_client( } } } + uuid = recv_destroyed(&mut destroyed_rx) => { + if let Some(uuid) = uuid { + // Stop encoding for this channel. + if let Some(h) = vchan_tasks.remove(&uuid) { + h.abort(); + } + // Notify the client. + if let Err(e) = write_vchan_uuid_msg( + &mut writer_for_rx, + AUDIO_MSG_VCHAN_DESTROYED, + uuid, + ) + .await + { + warn!("Audio vchan destroyed write to {} failed: {}", peer, e); + break; + } + } + } } } diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs index e626869..f133530 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs @@ -87,6 +87,8 @@ pub struct SdrVirtualChannelManager { /// Maximum total channels including the primary (enforced on `add_channel`). max_total: usize, channels: RwLock>, + /// Fires whenever a channel is destroyed (e.g. went out of SDR bandwidth). + destroyed_tx: broadcast::Sender, } impl SdrVirtualChannelManager { @@ -124,15 +126,22 @@ impl SdrVirtualChannelManager { permanent: true, }; + let (destroyed_tx, _) = broadcast::channel::(16); + Self { center_hz: pipeline.shared_center_hz.clone(), pipeline, fixed_slot_count, max_total: max_total.max(1), channels: RwLock::new(vec![primary]), + destroyed_tx, } } + pub fn destroyed_sender(&self) -> broadcast::Sender { + self.destroyed_tx.clone() + } + fn half_span_hz(&self) -> i64 { i64::from(self.pipeline.sdr_sample_rate) / 2 } @@ -141,16 +150,28 @@ impl SdrVirtualChannelManager { /// Recomputes the IF offset for every virtual channel. pub fn update_center_hz(&self, new_center_hz: i64) { self.center_hz.store(new_center_hz, Ordering::Relaxed); - let channels = self.channels.read().unwrap(); - let dsps = self.pipeline.channel_dsps.read().unwrap(); - for ch in channels.iter().filter(|c| !c.permanent) { - let new_if_hz = ch.freq_hz as i64 - new_center_hz; - if let Some(dsp_arc) = dsps.get(ch.pipeline_slot) { - dsp_arc - .lock() - .unwrap() - .set_channel_if_hz(new_if_hz as f64); + let half_span = self.half_span_hz(); + + // Single pass under read lock: update in-band IF offsets and collect OOB IDs. + let oob_ids: Vec = { + let channels = self.channels.read().unwrap(); + let dsps = self.pipeline.channel_dsps.read().unwrap(); + let mut oob = Vec::new(); + for ch in channels.iter().filter(|c| !c.permanent) { + let new_if_hz = ch.freq_hz as i64 - new_center_hz; + if new_if_hz.unsigned_abs() as i64 > half_span { + oob.push(ch.id); + } else if let Some(dsp_arc) = dsps.get(ch.pipeline_slot) { + dsp_arc.lock().unwrap().set_channel_if_hz(new_if_hz as f64); + } } + oob + }; // read locks released here + + // Destroy OOB channels and notify subscribers. + for id in oob_ids { + let _ = self.remove_channel(id); // acquires write lock internally + let _ = self.destroyed_tx.send(id); } } @@ -301,6 +322,10 @@ impl VirtualChannelManager for SdrVirtualChannelManager { self.max_total } + fn subscribe_destroyed(&self) -> broadcast::Receiver { + self.destroyed_tx.subscribe() + } + fn ensure_channel_pcm( &self, id: Uuid,