diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index 1c4e8b2..b9eacea 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -92,11 +92,11 @@ pub async fn run_multi_rig_audio_manager( replay_history_sink: Option>, mut shutdown_rx: watch::Receiver, vchan_audio: Arc>>>, - _vchan_cmd_rx: mpsc::UnboundedReceiver, + _vchan_cmd_rx: mpsc::Receiver, vchan_destroyed_tx: Option>, rig_audio_rx: Arc>>>, rig_audio_info: Arc>>>>, - rig_vchan_audio_cmd: Arc>>>, + rig_vchan_audio_cmd: Arc>>>, ) { // TX frames from the microphone go to the selected rig only. // We wrap the single tx_rx receiver so the per-rig task for the selected @@ -168,9 +168,10 @@ pub async fn run_multi_rig_audio_manager( .clone() }; - // Create per-rig vchan cmd channel. + // Create per-rig vchan cmd channel (bounded to prevent + // unbounded memory growth under backpressure). let (per_rig_vchan_tx, per_rig_vchan_rx) = - mpsc::unbounded_channel::(); + mpsc::channel::(256); if let Ok(mut map) = rig_vchan_audio_cmd.write() { map.insert(rig_id.clone(), per_rig_vchan_tx); } @@ -265,7 +266,7 @@ async fn run_single_rig_audio_client( replay_history_sink: Option>, mut shutdown_rx: watch::Receiver, vchan_audio: Arc>>>, - mut vchan_cmd_rx: mpsc::UnboundedReceiver, + mut vchan_cmd_rx: mpsc::Receiver, vchan_destroyed_tx: Option>, ) { let mut reconnect_delay = Duration::from_secs(1); @@ -409,7 +410,7 @@ async fn handle_single_rig_connection( replay_history_sink: Option>, shutdown_rx: &mut watch::Receiver, vchan_audio: &Arc>>>, - vchan_cmd_rx: &mut mpsc::UnboundedReceiver, + vchan_cmd_rx: &mut mpsc::Receiver, active_subs: &mut HashMap, vchan_destroyed_tx: &Option>, ) -> std::io::Result<()> { diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index 0cc3b08..923921a 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -454,7 +454,7 @@ async fn async_init() -> DynResult { frontend_runtime.decode_rx = Some(decode_tx.clone()); // Virtual-channel audio: shared broadcaster map + command channel. - let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::unbounded_channel::(); + let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::channel::(256); *frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx); let (vchan_destroyed_tx, _) = broadcast::channel::(64); diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs index 674f50e..8dc846b 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/background_decode.rs @@ -254,7 +254,7 @@ impl BackgroundDecodeManager { if let Some(rig_id) = self.active_rig_id() { if let Ok(map) = self.context.rig_vchan_audio_cmd.read() { if let Some(tx) = map.get(&rig_id) { - let _ = tx.send(cmd); + let _ = tx.try_send(cmd); return; } } @@ -262,7 +262,7 @@ impl BackgroundDecodeManager { // Fall back to global sender. if let Ok(guard) = self.context.vchan_audio_cmd.lock() { if let Some(tx) = guard.as_ref() { - let _ = tx.send(cmd); + let _ = tx.try_send(cmd); } } } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs index 41ddabd..277eee3 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs @@ -108,16 +108,16 @@ pub struct ClientChannelManager { pub change_tx: broadcast::Sender, pub max_channels: usize, /// Global fallback sender to the audio-client task for virtual-channel audio commands. - pub audio_cmd: std::sync::Mutex>>, + pub audio_cmd: std::sync::Mutex>>, /// Per-rig vchan command senders. Commands are routed to the per-rig sender /// when available, falling back to the global `audio_cmd`. - pub rig_vchan_audio_cmd: Arc>>>, + pub rig_vchan_audio_cmd: Arc>>>, } impl ClientChannelManager { pub fn new( max_channels: usize, - rig_vchan_audio_cmd: Arc>>>, + rig_vchan_audio_cmd: Arc>>>, ) -> Self { let (change_tx, _) = broadcast::channel(64); Self { @@ -131,7 +131,7 @@ impl ClientChannelManager { } /// Wire the global audio-command sender as fallback. - pub fn set_audio_cmd(&self, tx: mpsc::UnboundedSender) { + pub fn set_audio_cmd(&self, tx: mpsc::Sender) { *self.audio_cmd.lock().unwrap() = Some(tx); } @@ -141,13 +141,13 @@ impl ClientChannelManager { // Try per-rig sender first. if let Ok(map) = self.rig_vchan_audio_cmd.read() { if let Some(tx) = map.get(rig_id) { - let _ = tx.send(cmd); + let _ = tx.try_send(cmd); return; } } // Fall back to global sender. if let Some(tx) = self.audio_cmd.lock().unwrap().as_ref() { - let _ = tx.send(cmd); + let _ = tx.try_send(cmd); } }