From 763d4c00b028d8b84136e1440e360deb466f7ecd Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Thu, 12 Mar 2026 22:42:37 +0100 Subject: [PATCH] [feat](trx-server): support hidden background decode channels Co-authored-by: OpenAI Codex Signed-off-by: Stan Grams --- src/trx-server/src/audio.rs | 420 +++++++++++++++--- .../trx-backend-soapysdr/src/vchan_impl.rs | 167 ++++--- 2 files changed, 459 insertions(+), 128 deletions(-) diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 293dff9..6ceaec7 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -1769,11 +1769,210 @@ enum VChanCmd { Subscribe { uuid: Uuid, pcm_rx: tokio::sync::broadcast::Receiver>, + send_audio: bool, + background_decode: Option, }, /// Stop forwarding audio for the given channel. Unsubscribe(Uuid), } +#[derive(Clone, Debug)] +struct BackgroundDecodeSpec { + base_freq_hz: u64, + decoder_kinds: Vec, +} + +async fn run_background_hf_aprs_decoder( + sample_rate: u32, + channels: u16, + mut pcm_rx: broadcast::Receiver>, + decode_tx: broadcast::Sender, +) { + info!( + "Background HF APRS decoder started ({}Hz, {} ch)", + sample_rate, channels + ); + let mut decoder = AprsDecoder::new_hf(sample_rate); + + loop { + match pcm_rx.recv().await { + Ok(frame) => { + let mut mono = downmix_if_needed(frame, channels); + apply_decode_audio_gate(&mut mono); + for mut pkt in decoder.process_samples(&mono) { + if !pkt.crc_ok { + continue; + } + if pkt.ts_ms.is_none() { + pkt.ts_ms = Some(current_timestamp_ms()); + } + let _ = decode_tx.send(DecodedMessage::HfAprs(pkt)); + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("Background HF APRS decoder: dropped {} PCM frames", n); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } +} + +async fn run_background_ft8_decoder( + sample_rate: u32, + channels: u16, + mut pcm_rx: broadcast::Receiver>, + base_freq_hz: u64, + decode_tx: broadcast::Sender, +) { + info!( + "Background FT8 decoder started ({}Hz, {} ch @ {} Hz)", + sample_rate, channels, base_freq_hz + ); + let mut decoder = match Ft8Decoder::new(FT8_SAMPLE_RATE) { + Ok(decoder) => decoder, + Err(err) => { + warn!("Background FT8 decoder init failed: {}", err); + return; + } + }; + let mut ft8_buf: Vec = Vec::new(); + let mut last_slot: i64 = -1; + let slot_len_s: i64 = 15; + + loop { + match pcm_rx.recv().await { + Ok(frame) => { + let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) + { + Ok(dur) => dur.as_secs() as i64, + Err(_) => 0, + }; + let slot = now / slot_len_s; + if slot != last_slot { + last_slot = slot; + decoder.reset(); + ft8_buf.clear(); + } + + let mut mono = downmix_mono(frame, channels); + apply_decode_audio_gate(&mut mono); + let Some(resampled) = resample_to_12k(&mono, sample_rate) else { + warn!( + "Background FT8 decoder: unsupported sample rate {}", + sample_rate + ); + break; + }; + ft8_buf.extend_from_slice(&resampled); + + while ft8_buf.len() >= decoder.block_size() { + let block: Vec = ft8_buf.drain(..decoder.block_size()).collect(); + decoder.process_block(&block); + let results = decoder.decode_if_ready(100); + for res in results { + let abs_freq_hz = base_freq_hz as f64 + res.freq_hz as f64; + let msg = Ft8Message { + ts_ms: current_timestamp_ms(), + snr_db: res.snr_db, + dt_s: res.dt_s, + freq_hz: if abs_freq_hz.is_finite() && abs_freq_hz > 0.0 { + abs_freq_hz as f32 + } else { + res.freq_hz + }, + message: res.text, + }; + let _ = decode_tx.send(DecodedMessage::Ft8(msg)); + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("Background FT8 decoder: dropped {} PCM frames", n); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } +} + +async fn run_background_wspr_decoder( + sample_rate: u32, + channels: u16, + mut pcm_rx: broadcast::Receiver>, + base_freq_hz: u64, + decode_tx: broadcast::Sender, +) { + info!( + "Background WSPR decoder started ({}Hz, {} ch @ {} Hz)", + sample_rate, channels, base_freq_hz + ); + let decoder = match WsprDecoder::new() { + Ok(decoder) => decoder, + Err(err) => { + warn!("Background WSPR decoder init failed: {}", err); + return; + } + }; + let mut slot_buf: Vec = Vec::new(); + let mut last_slot: i64 = -1; + let slot_len_s: i64 = 120; + + loop { + match pcm_rx.recv().await { + Ok(frame) => { + let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) + { + Ok(dur) => dur.as_secs() as i64, + Err(_) => 0, + }; + let slot = now / slot_len_s; + if last_slot == -1 { + last_slot = slot; + } else if slot != last_slot { + match decoder.decode_slot(&slot_buf, Some(base_freq_hz)) { + Ok(results) => { + for res in results { + let msg = WsprMessage { + ts_ms: current_timestamp_ms(), + snr_db: res.snr_db, + dt_s: res.dt_s, + freq_hz: res.freq_hz, + message: res.message, + }; + let _ = decode_tx.send(DecodedMessage::Wspr(msg)); + } + } + Err(err) => warn!("Background WSPR decode failed: {}", err), + } + slot_buf.clear(); + last_slot = slot; + } + + let mut mono = downmix_mono(frame, channels); + apply_decode_audio_gate(&mut mono); + let Some(resampled) = resample_to_12k(&mono, sample_rate) else { + warn!( + "Background WSPR decoder: unsupported sample rate {}", + sample_rate + ); + break; + }; + slot_buf.extend_from_slice(&resampled); + if slot_buf.len() > decoder.slot_samples() { + let keep = decoder.slot_samples(); + let drain = slot_buf.len().saturating_sub(keep); + if drain > 0 { + slot_buf.drain(..drain); + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("Background WSPR decoder: dropped {} PCM frames", n); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } +} + /// Run the audio TCP listener, accepting client connections. #[allow(clippy::too_many_arguments)] pub async fn run_audio_listener( @@ -1936,6 +2135,7 @@ async fn handle_audio_client( let (vchan_frame_tx, mut vchan_frame_rx) = mpsc::channel::<(Uuid, Bytes)>(256); // Commands from the reader loop: Subscribe / Unsubscribe. let (vchan_cmd_tx, mut vchan_cmd_rx) = mpsc::channel::(32); + let (bg_decode_tx, mut bg_decode_rx) = broadcast::channel::(128); let opus_sample_rate = stream_info.sample_rate; let opus_channels = stream_info.channels; @@ -1945,8 +2145,8 @@ async fn handle_audio_client( vchan_manager.as_ref().map(|m| m.subscribe_destroyed()); let rx_handle = tokio::spawn(async move { - // UUID → JoinHandle of per-channel Opus encoder task. - let mut vchan_tasks: std::collections::HashMap> = + // UUID → JoinHandles of per-channel encoder/decoder tasks. + let mut vchan_tasks: std::collections::HashMap>> = std::collections::HashMap::new(); loop { @@ -1990,6 +2190,31 @@ async fn handle_audio_client( Err(broadcast::error::RecvError::Closed) => break, } } + result = bg_decode_rx.recv() => { + match result { + Ok(msg) => { + let msg_type = match &msg { + DecodedMessage::Ais(_) => AUDIO_MSG_AIS_DECODE, + DecodedMessage::Vdes(_) => AUDIO_MSG_VDES_DECODE, + DecodedMessage::Aprs(_) => AUDIO_MSG_APRS_DECODE, + DecodedMessage::HfAprs(_) => AUDIO_MSG_HF_APRS_DECODE, + DecodedMessage::Cw(_) => AUDIO_MSG_CW_DECODE, + DecodedMessage::Ft8(_) => AUDIO_MSG_FT8_DECODE, + DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE, + }; + if let Ok(json) = serde_json::to_vec(&msg) { + if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await { + warn!("Audio background decode write to {} failed: {}", peer, e); + break; + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("Audio background decode: {} dropped {} messages", peer, n); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } // Virtual-channel audio frame produced by a per-channel encoder task. Some((uuid, opus)) = vchan_frame_rx.recv() => { if let Err(e) = write_vchan_audio_frame(&mut writer_for_rx, uuid, opus.as_ref()).await { @@ -2000,56 +2225,109 @@ async fn handle_audio_client( // Commands from reader loop: subscribe / unsubscribe. Some(cmd) = vchan_cmd_rx.recv() => { match cmd { - VChanCmd::Subscribe { uuid, pcm_rx } => { - // Spin up an async Opus encoder task for this virtual channel. - let frame_tx = vchan_frame_tx.clone(); - let sr = opus_sample_rate; - let ch_count = opus_channels; - let mut pcm_rx = pcm_rx; - let handle = tokio::spawn(async move { - let opus_ch = match ch_count { - 1 => opus::Channels::Mono, - 2 => opus::Channels::Stereo, - _ => return, - }; - let mut encoder = match opus::Encoder::new( - sr, - opus_ch, - opus::Application::Audio, - ) { - Ok(e) => e, - Err(e) => { - warn!("vchan Opus encoder init failed: {}", e); - return; - } - }; - let _ = encoder.set_bitrate(opus::Bitrate::Bits(32_000)); - let _ = encoder.set_complexity(5); - let mut buf = vec![0u8; 4096]; - loop { - match pcm_rx.recv().await { - Ok(frame) => { - match encoder.encode_float(&frame, &mut buf) { - Ok(len) => { - let pkt = Bytes::copy_from_slice(&buf[..len]); - if frame_tx.send((uuid, pkt)).await.is_err() { - break; + VChanCmd::Subscribe { uuid, pcm_rx, send_audio, background_decode } => { + let mut handles = Vec::new(); + + if send_audio { + // Spin up an async Opus encoder task for this virtual channel. + let frame_tx = vchan_frame_tx.clone(); + let sr = opus_sample_rate; + let ch_count = opus_channels; + let mut pcm_rx_audio = pcm_rx.resubscribe(); + handles.push(tokio::spawn(async move { + let opus_ch = match ch_count { + 1 => opus::Channels::Mono, + 2 => opus::Channels::Stereo, + _ => return, + }; + let mut encoder = match opus::Encoder::new( + sr, + opus_ch, + opus::Application::Audio, + ) { + Ok(e) => e, + Err(e) => { + warn!("vchan Opus encoder init failed: {}", e); + return; + } + }; + let _ = encoder.set_bitrate(opus::Bitrate::Bits(32_000)); + let _ = encoder.set_complexity(5); + let mut buf = vec![0u8; 4096]; + loop { + match pcm_rx_audio.recv().await { + Ok(frame) => { + match encoder.encode_float(&frame, &mut buf) { + Ok(len) => { + let pkt = Bytes::copy_from_slice(&buf[..len]); + if frame_tx.send((uuid, pkt)).await.is_err() { + break; + } + } + Err(e) => { + warn!("vchan Opus encode error: {}", e); } } - Err(e) => { - warn!("vchan Opus encode error: {}", e); - } } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("vchan encoder: dropped {} PCM frames", n); + } + Err(broadcast::error::RecvError::Closed) => break, } - Err(broadcast::error::RecvError::Lagged(n)) => { - warn!("vchan encoder: dropped {} PCM frames", n); - } - Err(broadcast::error::RecvError::Closed) => break, } - } - }); + })); + } - vchan_tasks.insert(uuid, handle); + if let Some(spec) = background_decode { + let base_freq_hz = spec.base_freq_hz; + for kind in spec.decoder_kinds { + let decode_tx = bg_decode_tx.clone(); + let task_rx = pcm_rx.resubscribe(); + let sr = opus_sample_rate; + let ch_count = opus_channels as u16; + let kind = kind.to_ascii_lowercase(); + let handle = match kind.as_str() { + "ft8" => tokio::spawn(async move { + run_background_ft8_decoder( + sr, + ch_count, + task_rx, + base_freq_hz, + decode_tx, + ) + .await; + }), + "wspr" => tokio::spawn(async move { + run_background_wspr_decoder( + sr, + ch_count, + task_rx, + base_freq_hz, + decode_tx, + ) + .await; + }), + "hf-aprs" => tokio::spawn(async move { + run_background_hf_aprs_decoder( + sr, + ch_count, + task_rx, + decode_tx, + ) + .await; + }), + other => { + warn!("Unsupported background decoder kind '{}'", other); + continue; + } + }; + handles.push(handle); + } + } + + if !handles.is_empty() { + vchan_tasks.insert(uuid, handles); + } // Acknowledge to the client. if let Err(e) = write_vchan_uuid_msg( @@ -2064,8 +2342,10 @@ async fn handle_audio_client( } } VChanCmd::Unsubscribe(uuid) => { - if let Some(h) = vchan_tasks.remove(&uuid) { - h.abort(); + if let Some(handles) = vchan_tasks.remove(&uuid) { + for handle in handles { + handle.abort(); + } } } } @@ -2073,8 +2353,10 @@ async fn handle_audio_client( uuid = recv_destroyed(&mut destroyed_rx) => { if let Some(uuid) = uuid { // Stop encoding for this channel. - if let Some(h) = vchan_tasks.remove(&uuid) { - h.abort(); + if let Some(handles) = vchan_tasks.remove(&uuid) { + for handle in handles { + handle.abort(); + } } // Notify the client. if let Err(e) = write_vchan_uuid_msg( @@ -2093,8 +2375,10 @@ async fn handle_audio_client( } // Abort all per-channel encoder tasks on disconnect. - for (_, h) in vchan_tasks { - h.abort(); + for (_, handles) in vchan_tasks { + for handle in handles { + handle.abort(); + } } }); @@ -2130,12 +2414,40 @@ async fn handle_audio_client( .and_then(|s| s.parse::().ok()); let freq_hz = v["freq_hz"].as_u64(); let mode_str = v["mode"].as_str().unwrap_or("USB"); + let hidden = v["hidden"].as_bool().unwrap_or(false); + let bandwidth_hz = v["bandwidth_hz"].as_u64().map(|bw| bw as u32); + let decoder_kinds = v["decoder_kinds"] + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|item| item.as_str().map(str::to_string)) + .collect::>() + }) + .unwrap_or_default(); let mode = trx_protocol::codec::parse_mode(mode_str); if let (Some(uuid), Some(freq_hz)) = (uuid, freq_hz) { - match mgr.ensure_channel_pcm(uuid, freq_hz, &mode) { + let ensure_result = if hidden { + mgr.ensure_background_channel_pcm(uuid, freq_hz, &mode) + } else { + mgr.ensure_channel_pcm(uuid, freq_hz, &mode) + }; + match ensure_result { Ok(pcm_rx) => { + if let Some(bandwidth_hz) = bandwidth_hz.filter(|bw| *bw > 0) { + if let Err(e) = mgr.set_channel_bandwidth(uuid, bandwidth_hz) { + warn!("Audio vchan SUB bandwidth apply failed: {}", e); + } + } let _ = vchan_cmd_tx - .send(VChanCmd::Subscribe { uuid, pcm_rx }) + .send(VChanCmd::Subscribe { + uuid, + pcm_rx, + send_audio: !hidden, + background_decode: hidden.then_some(BackgroundDecodeSpec { + base_freq_hz: freq_hz, + decoder_kinds, + }), + }) .await; } Err(e) => warn!("Audio vchan SUB: {}", e), diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs index 99cd510..cf5ca12 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/vchan_impl.rs @@ -70,6 +70,9 @@ struct ManagedChannel { pipeline_slot: usize, /// True only for the primary channel; prevents removal. permanent: bool, + /// Hidden background-decode channels are omitted from the normal virtual + /// channel listing and do not count against the visible channel cap. + hidden: bool, } // --------------------------------------------------------------------------- @@ -124,6 +127,7 @@ impl SdrVirtualChannelManager { iq_tx: primary_iq_tx, pipeline_slot: 0, permanent: true, + hidden: false, }; let (destroyed_tx, _) = broadcast::channel::(16); @@ -146,6 +150,61 @@ impl SdrVirtualChannelManager { i64::from(self.pipeline.sdr_sample_rate) / 2 } + fn visible_channel_count(channels: &[ManagedChannel]) -> usize { + channels.iter().filter(|ch| !ch.hidden).count() + } + + fn create_channel( + &self, + channels: &mut Vec, + id: Uuid, + freq_hz: u64, + mode: &RigMode, + hidden: bool, + ) -> Result>, VChanError> { + let half_span = self.half_span_hz(); + let center = self.center_hz.load(Ordering::Relaxed); + let if_hz = freq_hz as i64 - center; + if if_hz.unsigned_abs() as i64 > half_span { + return Err(VChanError::OutOfBandwidth { + half_span_hz: half_span, + }); + } + + if !hidden && Self::visible_channel_count(channels) >= self.max_total { + return Err(VChanError::CapReached { + max: self.max_total, + }); + } + + let bandwidth_hz = default_bandwidth_hz(mode); + let (pcm_tx, iq_tx) = + self.pipeline + .add_virtual_channel(if_hz as f64, mode, bandwidth_hz, DEFAULT_FIR_TAPS); + + let pipeline_slot = self + .pipeline + .channel_dsps + .read() + .unwrap() + .len() + .saturating_sub(1); + + let pcm_rx = pcm_tx.subscribe(); + channels.push(ManagedChannel { + id, + freq_hz, + mode: mode.clone(), + pcm_tx, + iq_tx, + pipeline_slot, + permanent: false, + hidden, + }); + + Ok(pcm_rx) + } + /// Called by `SoapySdrRig` whenever the hardware center frequency changes. /// Recomputes the IF offset for every virtual channel. pub fn update_center_hz(&self, new_center_hz: i64) { @@ -208,45 +267,9 @@ impl VirtualChannelManager for SdrVirtualChannelManager { freq_hz: u64, mode: &RigMode, ) -> Result<(Uuid, broadcast::Receiver>), VChanError> { - let half_span = self.half_span_hz(); - let center = self.center_hz.load(Ordering::Relaxed); - let if_hz = freq_hz as i64 - center; - if if_hz.unsigned_abs() as i64 > half_span { - return Err(VChanError::OutOfBandwidth { - half_span_hz: half_span, - }); - } - let mut channels = self.channels.write().unwrap(); - if channels.len() >= self.max_total { - return Err(VChanError::CapReached { max: self.max_total }); - } - - let bandwidth_hz = default_bandwidth_hz(mode); - let (pcm_tx, iq_tx) = - self.pipeline - .add_virtual_channel(if_hz as f64, mode, bandwidth_hz, DEFAULT_FIR_TAPS); - - let pipeline_slot = self - .pipeline - .channel_dsps - .read() - .unwrap() - .len() - .saturating_sub(1); - let id = Uuid::new_v4(); - let pcm_rx = pcm_tx.subscribe(); - channels.push(ManagedChannel { - id, - freq_hz, - mode: mode.clone(), - pcm_tx, - iq_tx, - pipeline_slot, - permanent: false, - }); - + let pcm_rx = self.create_channel(&mut channels, id, freq_hz, mode, false)?; Ok((id, pcm_rx)) } @@ -337,6 +360,7 @@ impl VirtualChannelManager for SdrVirtualChannelManager { let channels = self.channels.read().unwrap(); channels .iter() + .filter(|ch| !ch.hidden) .enumerate() .map(|(idx, ch)| VChannelInfo { id: ch.id, @@ -370,46 +394,25 @@ impl VirtualChannelManager for SdrVirtualChannelManager { } } - // Slow path: create a new channel with the client-supplied UUID. - let half_span = self.half_span_hz(); - let center = self.center_hz.load(Ordering::Relaxed); - let if_hz = freq_hz as i64 - center; - if if_hz.unsigned_abs() as i64 > half_span { - return Err(VChanError::OutOfBandwidth { - half_span_hz: half_span, - }); + let mut channels = self.channels.write().unwrap(); + self.create_channel(&mut channels, id, freq_hz, mode, false) + } + + fn ensure_background_channel_pcm( + &self, + id: Uuid, + freq_hz: u64, + mode: &RigMode, + ) -> Result>, VChanError> { + { + let channels = self.channels.read().unwrap(); + if let Some(ch) = channels.iter().find(|c| c.id == id) { + return Ok(ch.pcm_tx.subscribe()); + } } let mut channels = self.channels.write().unwrap(); - if channels.len() >= self.max_total { - return Err(VChanError::CapReached { max: self.max_total }); - } - - let bandwidth_hz = default_bandwidth_hz(mode); - let (pcm_tx, iq_tx) = - self.pipeline - .add_virtual_channel(if_hz as f64, mode, bandwidth_hz, DEFAULT_FIR_TAPS); - - let pipeline_slot = self - .pipeline - .channel_dsps - .read() - .unwrap() - .len() - .saturating_sub(1); - - let pcm_rx = pcm_tx.subscribe(); - channels.push(ManagedChannel { - id, - freq_hz, - mode: mode.clone(), - pcm_tx, - iq_tx, - pipeline_slot, - permanent: false, - }); - - Ok(pcm_rx) + self.create_channel(&mut channels, id, freq_hz, mode, true) } } @@ -489,4 +492,20 @@ mod tests { let err = mgr.add_channel(10_000_000, &RigMode::USB).unwrap_err(); assert!(matches!(err, VChanError::OutOfBandwidth { .. })); } + + #[test] + fn hidden_background_channels_are_outside_visible_cap() { + let p = make_pipeline(); + let mgr = SdrVirtualChannelManager::new(p, 1, 2); // primary + 1 visible max + mgr.update_center_hz(14_100_000); + + mgr.add_channel(14_074_000, &RigMode::USB).unwrap(); + let hidden_id = Uuid::new_v4(); + mgr.ensure_background_channel_pcm(hidden_id, 14_075_000, &RigMode::DIG) + .unwrap(); + + let visible = mgr.channels(); + assert_eq!(visible.len(), 2); + assert!(visible.iter().all(|channel| channel.id != hidden_id)); + } }