diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 0d04feb..a92a420 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -4,7 +4,7 @@ //! Audio capture, playback, and TCP streaming for trx-server. -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -2218,6 +2218,9 @@ async fn handle_audio_client( let mut destroyed_rx: Option> = vchan_manager.as_ref().map(|m| m.subscribe_destroyed()); + let hidden_channels = Arc::new(Mutex::new(HashSet::::new())); + let hidden_channels_for_rx = hidden_channels.clone(); + let vchan_manager_for_rx = vchan_manager.clone(); let rx_handle = tokio::spawn(async move { // UUID → JoinHandles of per-channel encoder/decoder tasks. let mut vchan_tasks: std::collections::HashMap>> = @@ -2301,6 +2304,15 @@ async fn handle_audio_client( match cmd { VChanCmd::Subscribe { uuid, pcm_rx, send_audio, background_decode } => { let mut handles = Vec::new(); + let is_hidden = background_decode.is_some(); + + if let Ok(mut guard) = hidden_channels_for_rx.lock() { + if is_hidden { + guard.insert(uuid); + } else { + guard.remove(&uuid); + } + } if send_audio { // Spin up an async Opus encoder task for this virtual channel. @@ -2439,6 +2451,9 @@ async fn handle_audio_client( handle.abort(); } } + if let Ok(mut guard) = hidden_channels_for_rx.lock() { + guard.remove(&uuid); + } } } } @@ -2450,6 +2465,9 @@ async fn handle_audio_client( handle.abort(); } } + if let Ok(mut guard) = hidden_channels_for_rx.lock() { + guard.remove(&uuid); + } // Notify the client. if let Err(e) = write_vchan_uuid_msg( &mut writer_for_rx, @@ -2472,6 +2490,22 @@ async fn handle_audio_client( handle.abort(); } } + let stale_hidden_channels: Vec = hidden_channels_for_rx + .lock() + .map(|guard| guard.iter().copied().collect()) + .unwrap_or_default(); + if let Some(ref mgr) = vchan_manager_for_rx { + for uuid in &stale_hidden_channels { + let _ = mgr.remove_channel(*uuid); + } + } + if !stale_hidden_channels.is_empty() { + info!( + "Audio client {} cleaned up {} hidden background channels", + peer, + stale_hidden_channels.len() + ); + } }); // Read TX frames (and virtual-channel sub/unsub commands) from client. @@ -2607,6 +2641,9 @@ async fn handle_audio_client( Ok(uuid) => { // Unsubscribe first. let _ = vchan_cmd_tx.send(VChanCmd::Unsubscribe(uuid)).await; + if let Ok(mut guard) = hidden_channels.lock() { + guard.remove(&uuid); + } // Then remove from the DSP pipeline. if let Some(ref mgr) = vchan_manager { if let Err(e) = mgr.remove_channel(uuid) {