[feat](trx-client): support background decode subscriptions

Co-authored-by: OpenAI Codex <codex@openai.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-12 22:42:43 +01:00
parent 763d4c00b0
commit 21a534bdb6
+63 -12
View File
@@ -33,6 +33,15 @@ use trx_core::audio::{
use trx_core::decode::DecodedMessage;
use trx_frontend::VChanAudioCmd;
#[derive(Clone, Debug)]
struct ActiveVChanSub {
freq_hz: u64,
mode: String,
bandwidth_hz: u32,
hidden: bool,
decoder_kinds: Vec<String>,
}
/// Run the audio client with auto-reconnect.
#[allow(clippy::too_many_arguments)]
pub async fn run_audio_client(
@@ -51,10 +60,9 @@ pub async fn run_audio_client(
vchan_destroyed_tx: Option<broadcast::Sender<Uuid>>,
) {
let mut reconnect_delay = Duration::from_secs(1);
// Active virtual-channel subscriptions, keyed by UUID.
// Tuple: (freq_hz, mode, bandwidth_hz) — re-sent to the server on every audio TCP reconnect.
// bandwidth_hz == 0 means "use mode default".
let mut active_subs: HashMap<Uuid, (u64, String, u32)> = HashMap::new();
// Active virtual-channel subscriptions, keyed by UUID, re-sent to the
// server on every audio TCP reconnect.
let mut active_subs: HashMap<Uuid, ActiveVChanSub> = HashMap::new();
loop {
if *shutdown_rx.borrow() {
@@ -139,7 +147,7 @@ async fn handle_audio_connection(
shutdown_rx: &mut watch::Receiver<bool>,
vchan_audio: &Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
vchan_cmd_rx: &mut mpsc::Receiver<VChanAudioCmd>,
active_subs: &mut HashMap<Uuid, (u64, String, u32)>,
active_subs: &mut HashMap<Uuid, ActiveVChanSub>,
vchan_destroyed_tx: &Option<broadcast::Sender<Uuid>>,
) -> std::io::Result<()> {
let (reader, writer) = stream.into_split();
@@ -166,11 +174,14 @@ async fn handle_audio_connection(
// Track which UUIDs were pre-sent so we don't duplicate them when the
// same Subscribe command arrives from the mpsc queue.
let mut resubscribed: HashSet<Uuid> = HashSet::new();
for (&uuid, &(freq_hz, ref mode, bandwidth_hz)) in active_subs.iter() {
for (&uuid, sub) in active_subs.iter() {
let json = serde_json::json!({
"uuid": uuid.to_string(),
"freq_hz": freq_hz,
"mode": mode,
"freq_hz": sub.freq_hz,
"mode": sub.mode,
"hidden": sub.hidden,
"decoder_kinds": sub.decoder_kinds,
"bandwidth_hz": sub.bandwidth_hz,
});
if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await {
@@ -179,8 +190,8 @@ async fn handle_audio_connection(
}
}
// Re-apply non-default bandwidth after re-subscribing.
if bandwidth_hz > 0 {
let bw_json = serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": bandwidth_hz });
if sub.bandwidth_hz > 0 {
let bw_json = serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": sub.bandwidth_hz });
if let Ok(payload) = serde_json::to_vec(&bw_json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await {
warn!("Audio vchan reconnect BW write failed: {}", e);
@@ -317,7 +328,13 @@ async fn handle_audio_connection(
cmd = vchan_cmd_rx.recv() => {
match cmd {
Some(VChanAudioCmd::Subscribe { uuid, freq_hz, mode }) => {
active_subs.insert(uuid, (freq_hz, mode.clone(), 0));
active_subs.insert(uuid, ActiveVChanSub {
freq_hz,
mode: mode.clone(),
bandwidth_hz: 0,
hidden: false,
decoder_kinds: Vec::new(),
});
// Skip if already re-sent during reconnect initialization.
if resubscribed.remove(&uuid) {
// Already sent above; don't duplicate.
@@ -326,6 +343,7 @@ async fn handle_audio_connection(
"uuid": uuid.to_string(),
"freq_hz": freq_hz,
"mode": mode,
"hidden": false,
});
if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await {
@@ -335,6 +353,33 @@ async fn handle_audio_connection(
}
}
}
Some(VChanAudioCmd::SubscribeBackground { uuid, freq_hz, mode, bandwidth_hz, decoder_kinds }) => {
active_subs.insert(uuid, ActiveVChanSub {
freq_hz,
mode: mode.clone(),
bandwidth_hz,
hidden: true,
decoder_kinds: decoder_kinds.clone(),
});
if resubscribed.remove(&uuid) {
// Already sent above; don't duplicate.
} else {
let json = serde_json::json!({
"uuid": uuid.to_string(),
"freq_hz": freq_hz,
"mode": mode,
"hidden": true,
"decoder_kinds": decoder_kinds,
"bandwidth_hz": bandwidth_hz,
});
if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await {
warn!("Audio background SUB write failed: {}", e);
break;
}
}
}
}
Some(VChanAudioCmd::Unsubscribe(uuid)) => {
if let Err(e) = write_vchan_uuid_msg(&mut writer, AUDIO_MSG_VCHAN_UNSUB, uuid).await {
warn!("Audio vchan UNSUB write failed: {}", e);
@@ -353,6 +398,9 @@ async fn handle_audio_connection(
active_subs.remove(&uuid);
}
Some(VChanAudioCmd::SetFreq { uuid, freq_hz }) => {
if let Some(entry) = active_subs.get_mut(&uuid) {
entry.freq_hz = freq_hz;
}
let json = serde_json::json!({ "uuid": uuid.to_string(), "freq_hz": freq_hz });
if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_FREQ, &payload).await {
@@ -362,6 +410,9 @@ async fn handle_audio_connection(
}
}
Some(VChanAudioCmd::SetMode { uuid, mode }) => {
if let Some(entry) = active_subs.get_mut(&uuid) {
entry.mode = mode.clone();
}
let json = serde_json::json!({ "uuid": uuid.to_string(), "mode": mode });
if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_MODE, &payload).await {
@@ -373,7 +424,7 @@ async fn handle_audio_connection(
Some(VChanAudioCmd::SetBandwidth { uuid, bandwidth_hz }) => {
// Persist for reconnect.
if let Some(entry) = active_subs.get_mut(&uuid) {
entry.2 = bandwidth_hz;
entry.bandwidth_hz = bandwidth_hz;
}
let json = serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": bandwidth_hz });
if let Ok(payload) = serde_json::to_vec(&json) {