[feat](trx-rs): split VDES frontend and decoder path

Add a dedicated VDES plugin tab and live bar, stop reusing the AIS vessel UI, and serve a separate VDES frontend script. Rework the SDR backend so VDES receives a single 100 kHz IQ tap, then replace the fake AIS-clone decoder path with an early M.2092-1 oriented complex-baseband scaffold using burst detection, coarse pi/4-QPSK slicing, and TER-MCS-1.100 frame heuristics.

Co-authored-by: OpenAI Codex <codex@openai.com>
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-03-03 00:32:32 +01:00
parent 92423f1e02
commit 6e558303a7
16 changed files with 850 additions and 522 deletions
+1
View File
@@ -24,6 +24,7 @@ dirs = "6"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
bytes = "1"
cpal = "0.15"
num-complex = "0.4"
opus = "0.3"
trx-app = { path = "../trx-app" }
trx-backend = { path = "trx-backend", features = ["soapysdr"] }
+11 -31
View File
@@ -11,6 +11,7 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use bytes::Bytes;
use num_complex::Complex;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc, watch};
use tracing::{error, info, warn};
@@ -978,16 +979,13 @@ pub async fn run_ais_decoder(
/// Run the VDES decoder task. Only processes PCM when rig mode is VDES.
pub async fn run_vdes_decoder(
sample_rate: u32,
channels: u16,
mut pcm_a_rx: broadcast::Receiver<Vec<f32>>,
mut pcm_b_rx: broadcast::Receiver<Vec<f32>>,
mut iq_rx: broadcast::Receiver<Vec<Complex<f32>>>,
mut state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
histories: Arc<DecoderHistories>,
) {
info!("VDES decoder started ({}Hz, {} ch)", sample_rate, channels);
let mut decoder_a = VdesDecoder::new(sample_rate);
let mut decoder_b = VdesDecoder::new(sample_rate);
info!("VDES decoder started ({}Hz complex baseband)", sample_rate);
let mut decoder = VdesDecoder::new(sample_rate);
let mut was_active = false;
let mut active = matches!(state_rx.borrow().status.mode, RigMode::VDES);
@@ -998,8 +996,7 @@ pub async fn run_vdes_decoder(
let state = state_rx.borrow();
active = matches!(state.status.mode, RigMode::VDES);
if active {
pcm_a_rx = pcm_a_rx.resubscribe();
pcm_b_rx = pcm_b_rx.resubscribe();
iq_rx = iq_rx.resubscribe();
}
}
Err(_) => break,
@@ -1008,32 +1005,17 @@ pub async fn run_vdes_decoder(
}
tokio::select! {
recv = pcm_a_rx.recv() => {
recv = iq_rx.recv() => {
match recv {
Ok(frame) => {
Ok(block) => {
was_active = true;
for msg in decoder_a.process_samples(&downmix_if_needed(frame, channels), "A") {
for msg in decoder.process_samples(&block, "Main") {
histories.record_vdes_message(msg.clone());
let _ = decode_tx.send(DecodedMessage::Vdes(msg));
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("VDES decoder A: dropped {} PCM frames", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
recv = pcm_b_rx.recv() => {
match recv {
Ok(frame) => {
was_active = true;
for msg in decoder_b.process_samples(&downmix_if_needed(frame, channels), "B") {
histories.record_vdes_message(msg.clone());
let _ = decode_tx.send(DecodedMessage::Vdes(msg));
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("VDES decoder B: dropped {} PCM frames", n);
warn!("VDES decoder: dropped {} IQ blocks", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
@@ -1044,13 +1026,11 @@ pub async fn run_vdes_decoder(
let state = state_rx.borrow();
active = matches!(state.status.mode, RigMode::VDES);
if !active && was_active {
decoder_a.reset();
decoder_b.reset();
decoder.reset();
was_active = false;
}
if active {
pcm_a_rx = pcm_a_rx.resubscribe();
pcm_b_rx = pcm_b_rx.resubscribe();
iq_rx = iq_rx.resubscribe();
}
}
Err(_) => break,
+31 -12
View File
@@ -245,7 +245,7 @@ fn default_audio_bandwidth_for_mode(mode: &trx_core::rig::state::RigMode) -> u32
RigMode::FM => 12_500,
RigMode::WFM => 180_000,
RigMode::AIS => 25_000,
RigMode::VDES => 25_000,
RigMode::VDES => 100_000,
RigMode::Other(_) => 3_000,
}
}
@@ -282,6 +282,7 @@ type SdrRigBuildResult = DynResult<(
tokio::sync::broadcast::Receiver<Vec<f32>>,
tokio::sync::broadcast::Receiver<Vec<f32>>,
),
tokio::sync::broadcast::Receiver<Vec<num_complex::Complex<f32>>>,
)>;
type OptionalSdrRig = Option<Box<dyn trx_core::rig::RigCat>>;
@@ -290,6 +291,7 @@ type OptionalSdrAisPcmRx = Option<(
broadcast::Receiver<Vec<f32>>,
broadcast::Receiver<Vec<f32>>,
)>;
type OptionalSdrVdesIqRx = Option<broadcast::Receiver<Vec<num_complex::Complex<f32>>>>;
/// Build a `SoapySdrRig` with full channel config from a `RigInstanceConfig`.
#[cfg(feature = "soapysdr")]
@@ -350,7 +352,13 @@ fn build_sdr_rig_from_instance(rig_cfg: &RigInstanceConfig) -> SdrRigBuildResult
sdr_rig.subscribe_pcm_channel(ais_channel_base_idx),
sdr_rig.subscribe_pcm_channel(ais_channel_base_idx + 1),
);
Ok((Box::new(sdr_rig) as Box<dyn trx_core::rig::RigCat>, pcm_rx, ais_pcm))
let vdes_iq = sdr_rig.subscribe_iq_channel(0);
Ok((
Box::new(sdr_rig) as Box<dyn trx_core::rig::RigCat>,
pcm_rx,
ais_pcm,
vdes_iq,
))
}
/// Build a `RigTaskConfig` for a single rig instance.
@@ -429,6 +437,7 @@ fn spawn_rig_audio_stack(
broadcast::Receiver<Vec<f32>>,
broadcast::Receiver<Vec<f32>>,
)>,
sdr_vdes_iq_rx: Option<broadcast::Receiver<Vec<num_complex::Complex<f32>>>>,
) -> Vec<JoinHandle<()>> {
let mut handles: Vec<JoinHandle<()>> = Vec::new();
@@ -615,8 +624,6 @@ fn spawn_rig_audio_stack(
}));
if let Some((ais_a_pcm_rx, ais_b_pcm_rx)) = sdr_ais_pcm_rx {
let vdes_a_pcm_rx = ais_a_pcm_rx.resubscribe();
let vdes_b_pcm_rx = ais_b_pcm_rx.resubscribe();
let ais_state_rx = state_rx.clone();
let ais_decode_tx = decode_tx.clone();
let ais_shutdown_rx = shutdown_rx.clone();
@@ -629,16 +636,17 @@ fn spawn_rig_audio_stack(
_ = wait_for_shutdown(ais_shutdown_rx) => {}
}
}));
}
if let Some(vdes_iq_rx) = sdr_vdes_iq_rx {
let vdes_state_rx = state_rx.clone();
let vdes_decode_tx = decode_tx.clone();
let vdes_shutdown_rx = shutdown_rx.clone();
let vdes_histories = histories.clone();
let vdes_sr = rig_cfg.audio.sample_rate;
let vdes_ch = rig_cfg.audio.channels as u16;
let vdes_sr = (rig_cfg.sdr.sample_rate / (rig_cfg.sdr.sample_rate / 96_000).max(1)).max(1);
handles.push(tokio::spawn(async move {
tokio::select! {
_ = audio::run_vdes_decoder(vdes_sr, vdes_ch, vdes_a_pcm_rx, vdes_b_pcm_rx, vdes_state_rx, vdes_decode_tx, vdes_histories) => {}
_ = audio::run_vdes_decoder(vdes_sr, vdes_iq_rx, vdes_state_rx, vdes_decode_tx, vdes_histories) => {}
_ = wait_for_shutdown(vdes_shutdown_rx) => {}
}
}));
@@ -870,16 +878,26 @@ async fn main() -> DynResult<()> {
// Build SDR rig when applicable.
#[cfg(feature = "soapysdr")]
let (sdr_prebuilt_rig, sdr_pcm_rx, sdr_ais_pcm_rx): (OptionalSdrRig, OptionalSdrPcmRx, OptionalSdrAisPcmRx) =
let (sdr_prebuilt_rig, sdr_pcm_rx, sdr_ais_pcm_rx, sdr_vdes_iq_rx): (
OptionalSdrRig,
OptionalSdrPcmRx,
OptionalSdrAisPcmRx,
OptionalSdrVdesIqRx,
) =
if rig_cfg.rig.access.access_type.as_deref() == Some("sdr") {
let (rig, pcm_rx, ais_pcm_rx) = build_sdr_rig_from_instance(rig_cfg)?;
(Some(rig), Some(pcm_rx), Some(ais_pcm_rx))
let (rig, pcm_rx, ais_pcm_rx, vdes_iq_rx) = build_sdr_rig_from_instance(rig_cfg)?;
(Some(rig), Some(pcm_rx), Some(ais_pcm_rx), Some(vdes_iq_rx))
} else {
(None, None, None)
(None, None, None, None)
};
#[cfg(not(feature = "soapysdr"))]
let (sdr_prebuilt_rig, sdr_pcm_rx, sdr_ais_pcm_rx): (OptionalSdrRig, OptionalSdrPcmRx, OptionalSdrAisPcmRx) = (None, None, None);
let (sdr_prebuilt_rig, sdr_pcm_rx, sdr_ais_pcm_rx, sdr_vdes_iq_rx): (
OptionalSdrRig,
OptionalSdrPcmRx,
OptionalSdrAisPcmRx,
OptionalSdrVdesIqRx,
) = (None, None, None, None);
let histories = DecoderHistories::new();
@@ -941,6 +959,7 @@ async fn main() -> DynResult<()> {
audio_listen_override,
sdr_pcm_rx,
sdr_ais_pcm_rx,
sdr_vdes_iq_rx,
);
task_handles.extend(audio_handles);
@@ -82,6 +82,7 @@ impl IqSource for MockIqSource {
pub struct SdrPipeline {
pub pcm_senders: Vec<broadcast::Sender<Vec<f32>>>,
pub iq_senders: Vec<broadcast::Sender<Vec<Complex<f32>>>>,
pub channel_dsps: Vec<Arc<Mutex<ChannelDsp>>>,
/// Latest FFT magnitude bins (dBFS, FFT-shifted), updated ~10 Hz.
pub spectrum_buf: Arc<Mutex<Option<Vec<f32>>>>,
@@ -113,10 +114,12 @@ impl SdrPipeline {
const PCM_BROADCAST_CAPACITY: usize = 32;
let mut pcm_senders = Vec::with_capacity(channels.len());
let mut iq_senders = Vec::with_capacity(channels.len());
let mut channel_dsps: Vec<Arc<Mutex<ChannelDsp>>> = Vec::with_capacity(channels.len());
for &(channel_if_hz, ref mode, audio_bandwidth_hz, fir_taps) in channels {
let (pcm_tx, _pcm_rx) = broadcast::channel::<Vec<f32>>(PCM_BROADCAST_CAPACITY);
let (iq_tx, _iq_rx) = broadcast::channel::<Vec<Complex<f32>>>(IQ_BROADCAST_CAPACITY);
let dsp = ChannelDsp::new(
channel_if_hz,
mode,
@@ -129,8 +132,10 @@ impl SdrPipeline {
wfm_stereo,
fir_taps,
pcm_tx.clone(),
iq_tx.clone(),
);
pcm_senders.push(pcm_tx);
iq_senders.push(iq_tx);
channel_dsps.push(Arc::new(Mutex::new(dsp)));
}
@@ -159,6 +164,7 @@ impl SdrPipeline {
Self {
pcm_senders,
iq_senders,
channel_dsps,
spectrum_buf,
sdr_sample_rate,
@@ -44,7 +44,8 @@ fn default_bandwidth_for_mode(mode: &RigMode) -> u32 {
RigMode::AM => 9_000,
RigMode::FM => 12_500,
RigMode::WFM => 180_000,
RigMode::AIS | RigMode::VDES => 25_000,
RigMode::AIS => 25_000,
RigMode::VDES => 100_000,
RigMode::Other(_) => 3_000,
}
}
@@ -68,14 +69,17 @@ pub struct ChannelDsp {
frame_buf_offset: usize,
pub frame_size: usize,
pub pcm_tx: broadcast::Sender<Vec<f32>>,
pub iq_tx: broadcast::Sender<Vec<Complex<f32>>>,
scratch_mixed_i: Vec<f32>,
scratch_mixed_q: Vec<f32>,
scratch_filtered_i: Vec<f32>,
scratch_filtered_q: Vec<f32>,
scratch_decimated: Vec<Complex<f32>>,
scratch_iq_tap: Vec<Complex<f32>>,
pub mixer_phase: f64,
pub mixer_phase_inc: f64,
decim_counter: usize,
iq_tap_counter: usize,
resample_phase: f64,
resample_phase_inc: f64,
wfm_decoder: Option<WfmStereoDecoder>,
@@ -141,6 +145,7 @@ impl ChannelDsp {
let rate_changed = self.decim_factor != next_decim_factor;
self.decim_factor = next_decim_factor;
self.decim_counter = 0;
self.iq_tap_counter = 0;
self.resample_phase = 0.0;
self.resample_phase_inc = if self.sdr_sample_rate == 0 {
1.0
@@ -181,6 +186,7 @@ impl ChannelDsp {
wfm_stereo: bool,
fir_taps: usize,
pcm_tx: broadcast::Sender<Vec<f32>>,
iq_tx: broadcast::Sender<Vec<Complex<f32>>>,
) -> Self {
let output_channels = output_channels.max(1);
let audio_bandwidth_hz = Self::clamp_bandwidth_for_mode(mode, audio_bandwidth_hz);
@@ -224,14 +230,17 @@ impl ChannelDsp {
frame_buf_offset: 0,
frame_size,
pcm_tx,
iq_tx,
scratch_mixed_i: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_mixed_q: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_filtered_i: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_filtered_q: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_decimated: Vec::with_capacity(IQ_BLOCK_SIZE / decim_factor.max(1) + 1),
scratch_iq_tap: Vec::with_capacity(IQ_BLOCK_SIZE / decim_factor.max(1) + 1),
mixer_phase: 0.0,
mixer_phase_inc,
decim_counter: 0,
iq_tap_counter: 0,
resample_phase: 0.0,
resample_phase_inc: if sdr_sample_rate == 0 {
1.0
@@ -357,6 +366,25 @@ impl ChannelDsp {
self.scratch_decimated
.reserve(capacity - self.scratch_decimated.capacity());
}
if self.mode == RigMode::VDES && self.iq_tx.receiver_count() > 0 {
self.scratch_iq_tap.clear();
if self.scratch_iq_tap.capacity() < capacity {
self.scratch_iq_tap
.reserve(capacity - self.scratch_iq_tap.capacity());
}
for idx in 0..n {
self.iq_tap_counter += 1;
if self.iq_tap_counter >= self.decim_factor {
self.iq_tap_counter = 0;
let fi = filtered_i.get(idx).copied().unwrap_or(0.0);
let fq = filtered_q.get(idx).copied().unwrap_or(0.0);
self.scratch_iq_tap.push(Complex::new(fi, fq));
}
}
if !self.scratch_iq_tap.is_empty() {
let _ = self.iq_tx.send(self.scratch_iq_tap.clone());
}
}
let decimated = &mut self.scratch_decimated;
if self.wfm_decoder.is_some() {
for idx in 0..n {
@@ -57,7 +57,8 @@ impl SoapySdrRig {
fn default_bandwidth_for_mode(mode: &RigMode) -> u32 {
match mode {
RigMode::LSB | RigMode::USB | RigMode::DIG => 3_000,
RigMode::PKT | RigMode::AIS | RigMode::VDES => 25_000,
RigMode::PKT | RigMode::AIS => 25_000,
RigMode::VDES => 100_000,
RigMode::CW | RigMode::CWR => 500,
RigMode::AM => 9_000,
RigMode::FM => 12_500,
@@ -297,6 +298,19 @@ impl SoapySdrRig {
}
}
}
pub fn subscribe_iq_channel(
&self,
channel_idx: usize,
) -> tokio::sync::broadcast::Receiver<Vec<num_complex::Complex<f32>>> {
if let Some(sender) = self.pipeline.iq_senders.get(channel_idx) {
sender.subscribe()
} else {
let (tx, rx) = tokio::sync::broadcast::channel(1);
drop(tx);
rx
}
}
}
// ---------------------------------------------------------------------------
@@ -353,7 +367,7 @@ impl RigCat for SoapySdrRig {
let half_span_hz = i128::from(self.pipeline.sdr_sample_rate) / 2;
let current_center_hz = i128::from(self.center_hz);
let target_lo_hz = i128::from(freq.hz);
let target_hi_hz = if matches!(self.mode, RigMode::AIS | RigMode::VDES) {
let target_hi_hz = if self.mode == RigMode::AIS {
i128::from(freq.hz) + i128::from(AIS_CHANNEL_SPACING_HZ)
} else {
i128::from(freq.hz)