[refactor](trx-client): switch VChanAudioCmd to bounded channels (cap 256)

Replace unbounded_channel with channel(256) for VChanAudioCmd to prevent
unlimited memory accumulation under backpressure. Use try_send in
synchronous contexts.

https://claude.ai/code/session_01XzurkeuUmamBuhQwxVy7T4
Signed-off-by: Claude <noreply@anthropic.com>
This commit is contained in:
Claude
2026-03-25 22:42:55 +00:00
committed by Stan Grams
parent c3abc5ff5b
commit 635a1214d0
4 changed files with 16 additions and 15 deletions
+7 -6
View File
@@ -92,11 +92,11 @@ pub async fn run_multi_rig_audio_manager(
replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>, replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>,
mut shutdown_rx: watch::Receiver<bool>, mut shutdown_rx: watch::Receiver<bool>,
vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>, vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
_vchan_cmd_rx: mpsc::UnboundedReceiver<VChanAudioCmd>, _vchan_cmd_rx: mpsc::Receiver<VChanAudioCmd>,
vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>, vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>,
rig_audio_rx: Arc<RwLock<HashMap<String, broadcast::Sender<Bytes>>>>, rig_audio_rx: Arc<RwLock<HashMap<String, broadcast::Sender<Bytes>>>>,
rig_audio_info: Arc<RwLock<HashMap<String, watch::Sender<Option<AudioStreamInfo>>>>>, rig_audio_info: Arc<RwLock<HashMap<String, watch::Sender<Option<AudioStreamInfo>>>>>,
rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::UnboundedSender<VChanAudioCmd>>>>, rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::Sender<VChanAudioCmd>>>>,
) { ) {
// TX frames from the microphone go to the selected rig only. // TX frames from the microphone go to the selected rig only.
// We wrap the single tx_rx receiver so the per-rig task for the selected // We wrap the single tx_rx receiver so the per-rig task for the selected
@@ -168,9 +168,10 @@ pub async fn run_multi_rig_audio_manager(
.clone() .clone()
}; };
// Create per-rig vchan cmd channel. // Create per-rig vchan cmd channel (bounded to prevent
// unbounded memory growth under backpressure).
let (per_rig_vchan_tx, per_rig_vchan_rx) = let (per_rig_vchan_tx, per_rig_vchan_rx) =
mpsc::unbounded_channel::<VChanAudioCmd>(); mpsc::channel::<VChanAudioCmd>(256);
if let Ok(mut map) = rig_vchan_audio_cmd.write() { if let Ok(mut map) = rig_vchan_audio_cmd.write() {
map.insert(rig_id.clone(), per_rig_vchan_tx); map.insert(rig_id.clone(), per_rig_vchan_tx);
} }
@@ -265,7 +266,7 @@ async fn run_single_rig_audio_client(
replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>, replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>,
mut shutdown_rx: watch::Receiver<bool>, mut shutdown_rx: watch::Receiver<bool>,
vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>, vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
mut vchan_cmd_rx: mpsc::UnboundedReceiver<VChanAudioCmd>, mut vchan_cmd_rx: mpsc::Receiver<VChanAudioCmd>,
vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>, vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>,
) { ) {
let mut reconnect_delay = Duration::from_secs(1); let mut reconnect_delay = Duration::from_secs(1);
@@ -409,7 +410,7 @@ async fn handle_single_rig_connection(
replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>, replay_history_sink: Option<Arc<dyn Fn(DecodedMessage) + Send + Sync>>,
shutdown_rx: &mut watch::Receiver<bool>, shutdown_rx: &mut watch::Receiver<bool>,
vchan_audio: &Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>, vchan_audio: &Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
vchan_cmd_rx: &mut mpsc::UnboundedReceiver<VChanAudioCmd>, vchan_cmd_rx: &mut mpsc::Receiver<VChanAudioCmd>,
active_subs: &mut HashMap<Uuid, ActiveVChanSub>, active_subs: &mut HashMap<Uuid, ActiveVChanSub>,
vchan_destroyed_tx: &Option<broadcast::Sender<Uuid>>, vchan_destroyed_tx: &Option<broadcast::Sender<Uuid>>,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
+1 -1
View File
@@ -454,7 +454,7 @@ async fn async_init() -> DynResult<AppState> {
frontend_runtime.decode_rx = Some(decode_tx.clone()); frontend_runtime.decode_rx = Some(decode_tx.clone());
// Virtual-channel audio: shared broadcaster map + command channel. // Virtual-channel audio: shared broadcaster map + command channel.
let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::unbounded_channel::<trx_frontend::VChanAudioCmd>(); let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::channel::<trx_frontend::VChanAudioCmd>(256);
*frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx); *frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx);
let (vchan_destroyed_tx, _) = broadcast::channel::<uuid::Uuid>(64); let (vchan_destroyed_tx, _) = broadcast::channel::<uuid::Uuid>(64);
@@ -254,7 +254,7 @@ impl BackgroundDecodeManager {
if let Some(rig_id) = self.active_rig_id() { if let Some(rig_id) = self.active_rig_id() {
if let Ok(map) = self.context.rig_vchan_audio_cmd.read() { if let Ok(map) = self.context.rig_vchan_audio_cmd.read() {
if let Some(tx) = map.get(&rig_id) { if let Some(tx) = map.get(&rig_id) {
let _ = tx.send(cmd); let _ = tx.try_send(cmd);
return; return;
} }
} }
@@ -262,7 +262,7 @@ impl BackgroundDecodeManager {
// Fall back to global sender. // Fall back to global sender.
if let Ok(guard) = self.context.vchan_audio_cmd.lock() { if let Ok(guard) = self.context.vchan_audio_cmd.lock() {
if let Some(tx) = guard.as_ref() { if let Some(tx) = guard.as_ref() {
let _ = tx.send(cmd); let _ = tx.try_send(cmd);
} }
} }
} }
@@ -108,16 +108,16 @@ pub struct ClientChannelManager {
pub change_tx: broadcast::Sender<String>, pub change_tx: broadcast::Sender<String>,
pub max_channels: usize, pub max_channels: usize,
/// Global fallback sender to the audio-client task for virtual-channel audio commands. /// Global fallback sender to the audio-client task for virtual-channel audio commands.
pub audio_cmd: std::sync::Mutex<Option<mpsc::UnboundedSender<VChanAudioCmd>>>, pub audio_cmd: std::sync::Mutex<Option<mpsc::Sender<VChanAudioCmd>>>,
/// Per-rig vchan command senders. Commands are routed to the per-rig sender /// Per-rig vchan command senders. Commands are routed to the per-rig sender
/// when available, falling back to the global `audio_cmd`. /// when available, falling back to the global `audio_cmd`.
pub rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::UnboundedSender<VChanAudioCmd>>>>, pub rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::Sender<VChanAudioCmd>>>>,
} }
impl ClientChannelManager { impl ClientChannelManager {
pub fn new( pub fn new(
max_channels: usize, max_channels: usize,
rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::UnboundedSender<VChanAudioCmd>>>>, rig_vchan_audio_cmd: Arc<RwLock<HashMap<String, mpsc::Sender<VChanAudioCmd>>>>,
) -> Self { ) -> Self {
let (change_tx, _) = broadcast::channel(64); let (change_tx, _) = broadcast::channel(64);
Self { Self {
@@ -131,7 +131,7 @@ impl ClientChannelManager {
} }
/// Wire the global audio-command sender as fallback. /// Wire the global audio-command sender as fallback.
pub fn set_audio_cmd(&self, tx: mpsc::UnboundedSender<VChanAudioCmd>) { pub fn set_audio_cmd(&self, tx: mpsc::Sender<VChanAudioCmd>) {
*self.audio_cmd.lock().unwrap() = Some(tx); *self.audio_cmd.lock().unwrap() = Some(tx);
} }
@@ -141,13 +141,13 @@ impl ClientChannelManager {
// Try per-rig sender first. // Try per-rig sender first.
if let Ok(map) = self.rig_vchan_audio_cmd.read() { if let Ok(map) = self.rig_vchan_audio_cmd.read() {
if let Some(tx) = map.get(rig_id) { if let Some(tx) = map.get(rig_id) {
let _ = tx.send(cmd); let _ = tx.try_send(cmd);
return; return;
} }
} }
// Fall back to global sender. // Fall back to global sender.
if let Some(tx) = self.audio_cmd.lock().unwrap().as_ref() { if let Some(tx) = self.audio_cmd.lock().unwrap().as_ref() {
let _ = tx.send(cmd); let _ = tx.try_send(cmd);
} }
} }