[fix](trx-server): clean up hidden vchans on disconnect

Remove hidden background decode channels when the owning audio client disconnects to avoid stale DSP and decoder buildup.\n\nCo-authored-by: OpenAI Codex <codex@openai.com>

Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-13 07:01:09 +01:00
parent 69fd24577c
commit c178c56f2e
+38 -1
View File
@@ -4,7 +4,7 @@
//! Audio capture, playback, and TCP streaming for trx-server. //! Audio capture, playback, and TCP streaming for trx-server.
use std::collections::VecDeque; use std::collections::{HashSet, VecDeque};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -2218,6 +2218,9 @@ async fn handle_audio_client(
let mut destroyed_rx: Option<broadcast::Receiver<Uuid>> = let mut destroyed_rx: Option<broadcast::Receiver<Uuid>> =
vchan_manager.as_ref().map(|m| m.subscribe_destroyed()); vchan_manager.as_ref().map(|m| m.subscribe_destroyed());
let hidden_channels = Arc::new(Mutex::new(HashSet::<Uuid>::new()));
let hidden_channels_for_rx = hidden_channels.clone();
let vchan_manager_for_rx = vchan_manager.clone();
let rx_handle = tokio::spawn(async move { let rx_handle = tokio::spawn(async move {
// UUID → JoinHandles of per-channel encoder/decoder tasks. // UUID → JoinHandles of per-channel encoder/decoder tasks.
let mut vchan_tasks: std::collections::HashMap<Uuid, Vec<tokio::task::JoinHandle<()>>> = let mut vchan_tasks: std::collections::HashMap<Uuid, Vec<tokio::task::JoinHandle<()>>> =
@@ -2301,6 +2304,15 @@ async fn handle_audio_client(
match cmd { match cmd {
VChanCmd::Subscribe { uuid, pcm_rx, send_audio, background_decode } => { VChanCmd::Subscribe { uuid, pcm_rx, send_audio, background_decode } => {
let mut handles = Vec::new(); let mut handles = Vec::new();
let is_hidden = background_decode.is_some();
if let Ok(mut guard) = hidden_channels_for_rx.lock() {
if is_hidden {
guard.insert(uuid);
} else {
guard.remove(&uuid);
}
}
if send_audio { if send_audio {
// Spin up an async Opus encoder task for this virtual channel. // Spin up an async Opus encoder task for this virtual channel.
@@ -2439,6 +2451,9 @@ async fn handle_audio_client(
handle.abort(); handle.abort();
} }
} }
if let Ok(mut guard) = hidden_channels_for_rx.lock() {
guard.remove(&uuid);
}
} }
} }
} }
@@ -2450,6 +2465,9 @@ async fn handle_audio_client(
handle.abort(); handle.abort();
} }
} }
if let Ok(mut guard) = hidden_channels_for_rx.lock() {
guard.remove(&uuid);
}
// Notify the client. // Notify the client.
if let Err(e) = write_vchan_uuid_msg( if let Err(e) = write_vchan_uuid_msg(
&mut writer_for_rx, &mut writer_for_rx,
@@ -2472,6 +2490,22 @@ async fn handle_audio_client(
handle.abort(); handle.abort();
} }
} }
let stale_hidden_channels: Vec<Uuid> = hidden_channels_for_rx
.lock()
.map(|guard| guard.iter().copied().collect())
.unwrap_or_default();
if let Some(ref mgr) = vchan_manager_for_rx {
for uuid in &stale_hidden_channels {
let _ = mgr.remove_channel(*uuid);
}
}
if !stale_hidden_channels.is_empty() {
info!(
"Audio client {} cleaned up {} hidden background channels",
peer,
stale_hidden_channels.len()
);
}
}); });
// Read TX frames (and virtual-channel sub/unsub commands) from client. // Read TX frames (and virtual-channel sub/unsub commands) from client.
@@ -2607,6 +2641,9 @@ async fn handle_audio_client(
Ok(uuid) => { Ok(uuid) => {
// Unsubscribe first. // Unsubscribe first.
let _ = vchan_cmd_tx.send(VChanCmd::Unsubscribe(uuid)).await; let _ = vchan_cmd_tx.send(VChanCmd::Unsubscribe(uuid)).await;
if let Ok(mut guard) = hidden_channels.lock() {
guard.remove(&uuid);
}
// Then remove from the DSP pipeline. // Then remove from the DSP pipeline.
if let Some(ref mgr) = vchan_manager { if let Some(ref mgr) = vchan_manager {
if let Err(e) = mgr.remove_channel(uuid) { if let Err(e) = mgr.remove_channel(uuid) {