[fix](trx-client): preserve vchan commands under scheduler churn

Use an unbounded virtual-channel command queue so background decode and scheduler transitions do not silently drop subscribe or remove commands.\n\nCo-authored-by: OpenAI Codex <codex@openai.com>

Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-13 07:10:40 +01:00
parent c178c56f2e
commit 66987a4070
5 changed files with 9 additions and 8 deletions
+2 -2
View File
@@ -56,7 +56,7 @@ pub async fn run_audio_client(
decode_tx: broadcast::Sender<DecodedMessage>, decode_tx: broadcast::Sender<DecodedMessage>,
mut shutdown_rx: watch::Receiver<bool>, mut shutdown_rx: watch::Receiver<bool>,
vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>, vchan_audio: Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
mut vchan_cmd_rx: mpsc::Receiver<VChanAudioCmd>, mut vchan_cmd_rx: mpsc::UnboundedReceiver<VChanAudioCmd>,
vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>, vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>,
) { ) {
let mut reconnect_delay = Duration::from_secs(1); let mut reconnect_delay = Duration::from_secs(1);
@@ -146,7 +146,7 @@ async fn handle_audio_connection(
decode_tx: &broadcast::Sender<DecodedMessage>, decode_tx: &broadcast::Sender<DecodedMessage>,
shutdown_rx: &mut watch::Receiver<bool>, shutdown_rx: &mut watch::Receiver<bool>,
vchan_audio: &Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>, vchan_audio: &Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
vchan_cmd_rx: &mut mpsc::Receiver<VChanAudioCmd>, vchan_cmd_rx: &mut mpsc::UnboundedReceiver<VChanAudioCmd>,
active_subs: &mut HashMap<Uuid, ActiveVChanSub>, active_subs: &mut HashMap<Uuid, ActiveVChanSub>,
vchan_destroyed_tx: &Option<broadcast::Sender<Uuid>>, vchan_destroyed_tx: &Option<broadcast::Sender<Uuid>>,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
+2 -1
View File
@@ -300,7 +300,8 @@ async fn async_init() -> DynResult<AppState> {
frontend_runtime.decode_rx = Some(decode_tx.clone()); frontend_runtime.decode_rx = Some(decode_tx.clone());
// Virtual-channel audio: shared broadcaster map + command channel. // Virtual-channel audio: shared broadcaster map + command channel.
let (vchan_cmd_tx, vchan_cmd_rx) = mpsc::channel::<trx_frontend::VChanAudioCmd>(64); let (vchan_cmd_tx, vchan_cmd_rx) =
mpsc::unbounded_channel::<trx_frontend::VChanAudioCmd>();
*frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx); *frontend_runtime.vchan_audio_cmd.lock().unwrap() = Some(vchan_cmd_tx);
let (vchan_destroyed_tx, _) = broadcast::channel::<uuid::Uuid>(64); let (vchan_destroyed_tx, _) = broadcast::channel::<uuid::Uuid>(64);
+1 -1
View File
@@ -252,7 +252,7 @@ pub struct FrontendRuntimeContext {
/// Channel to send `VChanAudioCmd` to the audio-client task, which in turn /// Channel to send `VChanAudioCmd` to the audio-client task, which in turn
/// forwards `VCHAN_SUB` / `VCHAN_UNSUB` frames over the audio TCP connection. /// forwards `VCHAN_SUB` / `VCHAN_UNSUB` frames over the audio TCP connection.
/// `None` when no audio connection is active. /// `None` when no audio connection is active.
pub vchan_audio_cmd: Arc<Mutex<Option<mpsc::Sender<VChanAudioCmd>>>>, pub vchan_audio_cmd: Arc<Mutex<Option<mpsc::UnboundedSender<VChanAudioCmd>>>>,
/// Broadcast sender that fires whenever the server destroys a virtual /// Broadcast sender that fires whenever the server destroys a virtual
/// channel (e.g. out-of-bandwidth after center-frequency retune). /// channel (e.g. out-of-bandwidth after center-frequency retune).
/// The HTTP frontend subscribes to clean up `ClientChannelManager`. /// The HTTP frontend subscribes to clean up `ClientChannelManager`.
@@ -238,7 +238,7 @@ impl BackgroundDecodeManager {
fn send_audio_cmd(&self, cmd: VChanAudioCmd) { fn send_audio_cmd(&self, cmd: VChanAudioCmd) {
if let Ok(guard) = self.context.vchan_audio_cmd.lock() { if let Ok(guard) = self.context.vchan_audio_cmd.lock() {
if let Some(tx) = guard.as_ref() { if let Some(tx) = guard.as_ref() {
let _ = tx.try_send(cmd); let _ = tx.send(cmd);
} }
} }
} }
@@ -97,7 +97,7 @@ pub struct ClientChannelManager {
pub change_tx: broadcast::Sender<String>, pub change_tx: broadcast::Sender<String>,
pub max_channels: usize, pub max_channels: usize,
/// Optional sender to the audio-client task for virtual-channel audio commands. /// Optional sender to the audio-client task for virtual-channel audio commands.
pub audio_cmd: std::sync::Mutex<Option<tokio::sync::mpsc::Sender<VChanAudioCmd>>>, pub audio_cmd: std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedSender<VChanAudioCmd>>>,
} }
impl ClientChannelManager { impl ClientChannelManager {
@@ -114,14 +114,14 @@ impl ClientChannelManager {
/// Wire the audio-command sender so the manager can dispatch /// Wire the audio-command sender so the manager can dispatch
/// `VChanAudioCmd` messages when channels are allocated/deleted/changed. /// `VChanAudioCmd` messages when channels are allocated/deleted/changed.
pub fn set_audio_cmd(&self, tx: tokio::sync::mpsc::Sender<VChanAudioCmd>) { pub fn set_audio_cmd(&self, tx: tokio::sync::mpsc::UnboundedSender<VChanAudioCmd>) {
*self.audio_cmd.lock().unwrap() = Some(tx); *self.audio_cmd.lock().unwrap() = Some(tx);
} }
/// Fire-and-forget: send a `VChanAudioCmd` to the audio-client task. /// Fire-and-forget: send a `VChanAudioCmd` to the audio-client task.
fn send_audio_cmd(&self, cmd: VChanAudioCmd) { fn send_audio_cmd(&self, cmd: VChanAudioCmd) {
if let Some(tx) = self.audio_cmd.lock().unwrap().as_ref() { if let Some(tx) = self.audio_cmd.lock().unwrap().as_ref() {
let _ = tx.try_send(cmd); let _ = tx.send(cmd);
} }
} }