[fix](trx-frontend-http): start APRS on scheduler PKT channels

Carry bookmark decoder kinds through visible scheduler virtual channels so PKT/APRS scheduler entries start their decode workers instead of acting as audio-only channels.

Verification: cargo test -p trx-frontend-http vchan
Verification: cargo test -p trx-client (fails in existing config::tests::test_default_config)

Co-authored-by: OpenAI Codex <codex@openai.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-14 13:54:44 +01:00
parent 5590ac5c7d
commit c1e8ce42d2
5 changed files with 98 additions and 22 deletions
+5 -3
View File
@@ -327,13 +327,13 @@ async fn handle_audio_connection(
} }
cmd = vchan_cmd_rx.recv() => { cmd = vchan_cmd_rx.recv() => {
match cmd { match cmd {
Some(VChanAudioCmd::Subscribe { uuid, freq_hz, mode }) => { Some(VChanAudioCmd::Subscribe { uuid, freq_hz, mode, bandwidth_hz, decoder_kinds }) => {
active_subs.insert(uuid, ActiveVChanSub { active_subs.insert(uuid, ActiveVChanSub {
freq_hz, freq_hz,
mode: mode.clone(), mode: mode.clone(),
bandwidth_hz: 0, bandwidth_hz,
hidden: false, hidden: false,
decoder_kinds: Vec::new(), decoder_kinds: decoder_kinds.clone(),
}); });
// Skip if already re-sent during reconnect initialization. // Skip if already re-sent during reconnect initialization.
if resubscribed.remove(&uuid) { if resubscribed.remove(&uuid) {
@@ -344,6 +344,8 @@ async fn handle_audio_connection(
"freq_hz": freq_hz, "freq_hz": freq_hz,
"mode": mode, "mode": mode,
"hidden": false, "hidden": false,
"decoder_kinds": decoder_kinds,
"bandwidth_hz": bandwidth_hz,
}); });
if let Ok(payload) = serde_json::to_vec(&json) { if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await { if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_SUB, &payload).await {
+7 -1
View File
@@ -29,7 +29,13 @@ pub enum VChanAudioCmd {
/// Create the server-side DSP channel (if it does not exist) and subscribe /// Create the server-side DSP channel (if it does not exist) and subscribe
/// to its Opus audio stream. `freq_hz` and `mode` are used if the server /// to its Opus audio stream. `freq_hz` and `mode` are used if the server
/// needs to create the channel. /// needs to create the channel.
Subscribe { uuid: Uuid, freq_hz: u64, mode: String }, Subscribe {
uuid: Uuid,
freq_hz: u64,
mode: String,
bandwidth_hz: u32,
decoder_kinds: Vec<String>,
},
/// Create a hidden server-side DSP channel for background decoding. /// Create a hidden server-side DSP channel for background decoding.
/// These channels are not enumerated as user-visible virtual channels and /// These channels are not enumerated as user-visible virtual channels and
/// do not request an Opus audio stream back to the frontend. /// do not request an Opus audio stream back to the frontend.
@@ -402,8 +402,9 @@ fn sync_scheduler_vchannels(
( (
bookmark_id.clone(), bookmark_id.clone(),
bookmark.freq_hz, bookmark.freq_hz,
bookmark.mode, bookmark.mode.clone(),
bookmark.bandwidth_hz.unwrap_or(0) as u32, bookmark.bandwidth_hz.unwrap_or(0) as u32,
bookmark_decoder_kinds(&bookmark),
) )
}) })
}) })
@@ -1740,6 +1741,33 @@ fn bookmark_decoder_state(
(want_aprs, want_hf_aprs, want_ft8, want_wspr) (want_aprs, want_hf_aprs, want_ft8, want_wspr)
} }
fn bookmark_decoder_kinds(bookmark: &crate::server::bookmarks::Bookmark) -> Vec<String> {
let mut out = Vec::new();
for decoder in bookmark
.decoders
.iter()
.map(|item| item.trim().to_ascii_lowercase())
{
if matches!(
decoder.as_str(),
"aprs" | "ais" | "ft8" | "wspr" | "hf-aprs"
) && !out.iter().any(|existing| existing == &decoder)
{
out.push(decoder);
}
}
if !out.is_empty() {
return out;
}
match bookmark.mode.trim().to_ascii_uppercase().as_str() {
"AIS" => vec!["ais".to_string()],
"PKT" => vec!["aprs".to_string()],
_ => Vec::new(),
}
}
async fn apply_selected_channel( async fn apply_selected_channel(
rig_tx: &mpsc::Sender<RigRequest>, rig_tx: &mpsc::Sender<RigRequest>,
rig_id: &str, rig_id: &str,
@@ -85,6 +85,7 @@ struct InternalChannel {
mode: String, mode: String,
/// Audio filter bandwidth in Hz (0 = mode default). /// Audio filter bandwidth in Hz (0 = mode default).
bandwidth_hz: u32, bandwidth_hz: u32,
decoder_kinds: Vec<String>,
permanent: bool, permanent: bool,
scheduler_bookmark_id: Option<String>, scheduler_bookmark_id: Option<String>,
/// Session UUIDs currently subscribed to this channel. /// Session UUIDs currently subscribed to this channel.
@@ -169,6 +170,7 @@ impl ClientChannelManager {
freq_hz, freq_hz,
mode: mode.to_string(), mode: mode.to_string(),
bandwidth_hz: 0, bandwidth_hz: 0,
decoder_kinds: Vec::new(),
permanent: true, permanent: true,
scheduler_bookmark_id: None, scheduler_bookmark_id: None,
session_ids: Vec::new(), session_ids: Vec::new(),
@@ -237,6 +239,7 @@ impl ClientChannelManager {
freq_hz, freq_hz,
mode: mode.to_string(), mode: mode.to_string(),
bandwidth_hz: 0, bandwidth_hz: 0,
decoder_kinds: Vec::new(),
permanent: false, permanent: false,
scheduler_bookmark_id: None, scheduler_bookmark_id: None,
session_ids: vec![session_id], session_ids: vec![session_id],
@@ -266,6 +269,8 @@ impl ClientChannelManager {
uuid: id, uuid: id,
freq_hz, freq_hz,
mode: mode.to_string(), mode: mode.to_string(),
bandwidth_hz: 0,
decoder_kinds: Vec::new(),
}); });
Ok(snapshot) Ok(snapshot)
@@ -515,7 +520,7 @@ impl ClientChannelManager {
pub fn sync_scheduler_channels( pub fn sync_scheduler_channels(
&self, &self,
rig_id: &str, rig_id: &str,
desired: &[(String, u64, String, u32)], desired: &[(String, u64, String, u32, Vec<String>)],
) { ) {
let mut rigs = self.rigs.write().unwrap(); let mut rigs = self.rigs.write().unwrap();
let Some(channels) = rigs.get_mut(rig_id) else { let Some(channels) = rigs.get_mut(rig_id) else {
@@ -523,10 +528,13 @@ impl ClientChannelManager {
}; };
let mut changed = false; let mut changed = false;
let desired_map: HashMap<String, (u64, String, u32)> = desired let desired_map: HashMap<String, (u64, String, u32, Vec<String>)> = desired
.iter() .iter()
.map(|(bookmark_id, freq_hz, mode, bandwidth_hz)| { .map(|(bookmark_id, freq_hz, mode, bandwidth_hz, decoder_kinds)| {
(bookmark_id.clone(), (*freq_hz, mode.clone(), *bandwidth_hz)) (
bookmark_id.clone(),
(*freq_hz, mode.clone(), *bandwidth_hz, decoder_kinds.clone()),
)
}) })
.collect(); .collect();
let desired_ids: std::collections::HashSet<&str> = let desired_ids: std::collections::HashSet<&str> =
@@ -553,7 +561,7 @@ impl ClientChannelManager {
let Some(bookmark_id) = channel.scheduler_bookmark_id.as_deref() else { let Some(bookmark_id) = channel.scheduler_bookmark_id.as_deref() else {
continue; continue;
}; };
let Some((freq_hz, mode, bandwidth_hz)) = desired_map.get(bookmark_id) else { let Some((freq_hz, mode, bandwidth_hz, decoder_kinds)) = desired_map.get(bookmark_id) else {
continue; continue;
}; };
if channel.freq_hz != *freq_hz { if channel.freq_hz != *freq_hz {
@@ -580,9 +588,20 @@ impl ClientChannelManager {
}); });
changed = true; changed = true;
} }
if channel.decoder_kinds != *decoder_kinds {
channel.decoder_kinds = decoder_kinds.clone();
self.send_audio_cmd(VChanAudioCmd::Subscribe {
uuid: channel.id,
freq_hz: channel.freq_hz,
mode: channel.mode.clone(),
bandwidth_hz: channel.bandwidth_hz,
decoder_kinds: channel.decoder_kinds.clone(),
});
changed = true;
}
} }
for (bookmark_id, freq_hz, mode, bandwidth_hz) in desired { for (bookmark_id, freq_hz, mode, bandwidth_hz, decoder_kinds) in desired {
let exists = channels.iter().any(|channel| { let exists = channels.iter().any(|channel| {
channel.scheduler_bookmark_id.as_deref() == Some(bookmark_id.as_str()) channel.scheduler_bookmark_id.as_deref() == Some(bookmark_id.as_str())
}); });
@@ -598,6 +617,7 @@ impl ClientChannelManager {
freq_hz: *freq_hz, freq_hz: *freq_hz,
mode: mode.clone(), mode: mode.clone(),
bandwidth_hz: *bandwidth_hz, bandwidth_hz: *bandwidth_hz,
decoder_kinds: decoder_kinds.clone(),
permanent: false, permanent: false,
scheduler_bookmark_id: Some(bookmark_id.clone()), scheduler_bookmark_id: Some(bookmark_id.clone()),
session_ids: Vec::new(), session_ids: Vec::new(),
@@ -606,13 +626,9 @@ impl ClientChannelManager {
uuid: channel_id, uuid: channel_id,
freq_hz: *freq_hz, freq_hz: *freq_hz,
mode: mode.clone(), mode: mode.clone(),
bandwidth_hz: *bandwidth_hz,
decoder_kinds: decoder_kinds.clone(),
}); });
if *bandwidth_hz > 0 {
self.send_audio_cmd(VChanAudioCmd::SetBandwidth {
uuid: channel_id,
bandwidth_hz: *bandwidth_hz,
});
}
changed = true; changed = true;
} }
@@ -655,7 +671,13 @@ mod tests {
mgr.init_rig(rig_id, 14_074_000, "USB"); mgr.init_rig(rig_id, 14_074_000, "USB");
mgr.sync_scheduler_channels( mgr.sync_scheduler_channels(
rig_id, rig_id,
&[("bm-ft8".to_string(), 14_074_000, "DIG".to_string(), 3_000)], &[(
"bm-ft8".to_string(),
14_074_000,
"DIG".to_string(),
3_000,
vec!["ft8".to_string()],
)],
); );
let channels = mgr.channels(rig_id); let channels = mgr.channels(rig_id);
@@ -679,7 +701,13 @@ mod tests {
.expect("allocate vchan"); .expect("allocate vchan");
mgr.sync_scheduler_channels( mgr.sync_scheduler_channels(
rig_id, rig_id,
&[("bm-ft8".to_string(), 14_074_000, "DIG".to_string(), 3_000)], &[(
"bm-ft8".to_string(),
14_074_000,
"DIG".to_string(),
3_000,
vec!["ft8".to_string()],
)],
); );
mgr.release_session(session_id); mgr.release_session(session_id);
@@ -699,7 +727,13 @@ mod tests {
mgr.init_rig(rig_id, 14_074_000, "USB"); mgr.init_rig(rig_id, 14_074_000, "USB");
mgr.sync_scheduler_channels( mgr.sync_scheduler_channels(
rig_id, rig_id,
&[("bm-aprs".to_string(), 144_800_000, "PKT".to_string(), 12_500)], &[(
"bm-aprs".to_string(),
144_800_000,
"PKT".to_string(),
12_500,
vec!["aprs".to_string()],
)],
); );
let channel_id = mgr.channels(rig_id)[1].id; let channel_id = mgr.channels(rig_id)[1].id;
+8 -2
View File
@@ -2335,7 +2335,7 @@ async fn handle_audio_client(
match cmd { match cmd {
VChanCmd::Subscribe { uuid, pcm_rx, send_audio, background_decode } => { VChanCmd::Subscribe { uuid, pcm_rx, send_audio, background_decode } => {
let mut handles = Vec::new(); let mut handles = Vec::new();
let is_hidden = background_decode.is_some(); let is_hidden = !send_audio;
if let Ok(mut guard) = hidden_channels_for_rx.lock() { if let Ok(mut guard) = hidden_channels_for_rx.lock() {
if is_hidden { if is_hidden {
@@ -2345,6 +2345,12 @@ async fn handle_audio_client(
} }
} }
if let Some(existing_handles) = vchan_tasks.remove(&uuid) {
for handle in existing_handles {
handle.abort();
}
}
if send_audio { if send_audio {
// Spin up an async Opus encoder task for this virtual channel. // Spin up an async Opus encoder task for this virtual channel.
let frame_tx = vchan_frame_tx.clone(); let frame_tx = vchan_frame_tx.clone();
@@ -2600,7 +2606,7 @@ async fn handle_audio_client(
uuid, uuid,
pcm_rx, pcm_rx,
send_audio: !hidden, send_audio: !hidden,
background_decode: hidden.then_some(BackgroundDecodeSpec { background_decode: (!decoder_kinds.is_empty()).then_some(BackgroundDecodeSpec {
base_freq_hz: freq_hz, base_freq_hz: freq_hz,
decoder_kinds, decoder_kinds,
}), }),