[feat](trx-rs): add VDES decoder mode support
Add a new trx-vdes decoder path alongside AIS, wire VDES through the server/frontend decode pipeline, and fix the web map so AIS vessel symbols load correctly and the TRX receiver marker appears when location data arrives. Co-authored-by: OpenAI Codex <codex@openai.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -28,6 +28,7 @@ opus = "0.3"
|
||||
trx-app = { path = "../trx-app" }
|
||||
trx-backend = { path = "trx-backend", features = ["soapysdr"] }
|
||||
trx-ais = { path = "../decoders/trx-ais" }
|
||||
trx-vdes = { path = "../decoders/trx-vdes" }
|
||||
trx-core = { path = "../trx-core" }
|
||||
trx-aprs = { path = "../decoders/trx-aprs" }
|
||||
trx-cw = { path = "../decoders/trx-cw" }
|
||||
|
||||
+127
-2
@@ -20,12 +20,15 @@ use trx_aprs::AprsDecoder;
|
||||
use trx_core::audio::{
|
||||
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE,
|
||||
AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME,
|
||||
AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_WSPR_DECODE,
|
||||
AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE,
|
||||
};
|
||||
use trx_core::decode::{
|
||||
AisMessage, AprsPacket, DecodedMessage, Ft8Message, VdesMessage, WsprMessage,
|
||||
};
|
||||
use trx_core::decode::{AisMessage, AprsPacket, DecodedMessage, Ft8Message, WsprMessage};
|
||||
use trx_core::rig::state::{RigMode, RigState};
|
||||
use trx_cw::CwDecoder;
|
||||
use trx_ft8::Ft8Decoder;
|
||||
use trx_vdes::VdesDecoder;
|
||||
use trx_wspr::WsprDecoder;
|
||||
|
||||
use crate::config::AudioConfig;
|
||||
@@ -33,6 +36,7 @@ use trx_decode_log::DecoderLoggers;
|
||||
|
||||
const APRS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const AIS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const VDES_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const FT8_SAMPLE_RATE: u32 = 12_000;
|
||||
@@ -127,6 +131,7 @@ fn classify_stream_error(err: &str) -> &'static str {
|
||||
/// `Arc<DecoderHistories>` into every decoder task and into the audio listener.
|
||||
pub struct DecoderHistories {
|
||||
ais: Mutex<VecDeque<(Instant, AisMessage)>>,
|
||||
vdes: Mutex<VecDeque<(Instant, VdesMessage)>>,
|
||||
aprs: Mutex<VecDeque<(Instant, AprsPacket)>>,
|
||||
ft8: Mutex<VecDeque<(Instant, Ft8Message)>>,
|
||||
wspr: Mutex<VecDeque<(Instant, WsprMessage)>>,
|
||||
@@ -136,6 +141,7 @@ impl DecoderHistories {
|
||||
pub fn new() -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
ais: Mutex::new(VecDeque::new()),
|
||||
vdes: Mutex::new(VecDeque::new()),
|
||||
aprs: Mutex::new(VecDeque::new()),
|
||||
ft8: Mutex::new(VecDeque::new()),
|
||||
wspr: Mutex::new(VecDeque::new()),
|
||||
@@ -167,6 +173,31 @@ impl DecoderHistories {
|
||||
h.iter().map(|(_, msg)| msg.clone()).collect()
|
||||
}
|
||||
|
||||
// --- VDES ---
|
||||
|
||||
fn prune_vdes(history: &mut VecDeque<(Instant, VdesMessage)>) {
|
||||
let cutoff = Instant::now() - VDES_HISTORY_RETENTION;
|
||||
while let Some((ts, _)) = history.front() {
|
||||
if *ts < cutoff {
|
||||
history.pop_front();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_vdes_message(&self, msg: VdesMessage) {
|
||||
let mut h = self.vdes.lock().expect("vdes history mutex poisoned");
|
||||
h.push_back((Instant::now(), msg));
|
||||
Self::prune_vdes(&mut h);
|
||||
}
|
||||
|
||||
pub fn snapshot_vdes_history(&self) -> Vec<VdesMessage> {
|
||||
let mut h = self.vdes.lock().expect("vdes history mutex poisoned");
|
||||
Self::prune_vdes(&mut h);
|
||||
h.iter().map(|(_, msg)| msg.clone()).collect()
|
||||
}
|
||||
|
||||
// --- APRS ---
|
||||
|
||||
fn prune_aprs(history: &mut VecDeque<(Instant, AprsPacket)>) {
|
||||
@@ -944,6 +975,91 @@ pub async fn run_ais_decoder(
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the VDES decoder task. Only processes PCM when rig mode is VDES.
|
||||
pub async fn run_vdes_decoder(
|
||||
sample_rate: u32,
|
||||
channels: u16,
|
||||
mut pcm_a_rx: broadcast::Receiver<Vec<f32>>,
|
||||
mut pcm_b_rx: broadcast::Receiver<Vec<f32>>,
|
||||
mut state_rx: watch::Receiver<RigState>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
histories: Arc<DecoderHistories>,
|
||||
) {
|
||||
info!("VDES decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||
let mut decoder_a = VdesDecoder::new(sample_rate);
|
||||
let mut decoder_b = VdesDecoder::new(sample_rate);
|
||||
let mut was_active = false;
|
||||
let mut active = matches!(state_rx.borrow().status.mode, RigMode::VDES);
|
||||
|
||||
loop {
|
||||
if !active {
|
||||
match state_rx.changed().await {
|
||||
Ok(()) => {
|
||||
let state = state_rx.borrow();
|
||||
active = matches!(state.status.mode, RigMode::VDES);
|
||||
if active {
|
||||
pcm_a_rx = pcm_a_rx.resubscribe();
|
||||
pcm_b_rx = pcm_b_rx.resubscribe();
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
recv = pcm_a_rx.recv() => {
|
||||
match recv {
|
||||
Ok(frame) => {
|
||||
was_active = true;
|
||||
for msg in decoder_a.process_samples(&downmix_if_needed(frame, channels), "A") {
|
||||
histories.record_vdes_message(msg.clone());
|
||||
let _ = decode_tx.send(DecodedMessage::Vdes(msg));
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
warn!("VDES decoder A: dropped {} PCM frames", n);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
recv = pcm_b_rx.recv() => {
|
||||
match recv {
|
||||
Ok(frame) => {
|
||||
was_active = true;
|
||||
for msg in decoder_b.process_samples(&downmix_if_needed(frame, channels), "B") {
|
||||
histories.record_vdes_message(msg.clone());
|
||||
let _ = decode_tx.send(DecodedMessage::Vdes(msg));
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
warn!("VDES decoder B: dropped {} PCM frames", n);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
changed = state_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) => {
|
||||
let state = state_rx.borrow();
|
||||
active = matches!(state.status.mode, RigMode::VDES);
|
||||
if !active && was_active {
|
||||
decoder_a.reset();
|
||||
decoder_b.reset();
|
||||
was_active = false;
|
||||
}
|
||||
if active {
|
||||
pcm_a_rx = pcm_a_rx.resubscribe();
|
||||
pcm_b_rx = pcm_b_rx.resubscribe();
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the CW decoder task. Only processes PCM when rig mode is CW or CWR.
|
||||
pub async fn run_cw_decoder(
|
||||
sample_rate: u32,
|
||||
@@ -1468,6 +1584,14 @@ async fn handle_audio_client(
|
||||
write_audio_msg(&mut writer, msg_type, &json).await?;
|
||||
}
|
||||
}
|
||||
let history = histories.snapshot_vdes_history();
|
||||
for msg in history {
|
||||
let msg = DecodedMessage::Vdes(msg);
|
||||
let msg_type = AUDIO_MSG_VDES_DECODE;
|
||||
if let Ok(json) = serde_json::to_vec(&msg) {
|
||||
write_audio_msg(&mut writer, msg_type, &json).await?;
|
||||
}
|
||||
}
|
||||
// Send APRS history to newly connected client.
|
||||
let history = histories.snapshot_aprs_history();
|
||||
for pkt in history {
|
||||
@@ -1522,6 +1646,7 @@ async fn handle_audio_client(
|
||||
Ok(msg) => {
|
||||
let msg_type = match &msg {
|
||||
DecodedMessage::Ais(_) => AUDIO_MSG_AIS_DECODE,
|
||||
DecodedMessage::Vdes(_) => AUDIO_MSG_VDES_DECODE,
|
||||
DecodedMessage::Aprs(_) => AUDIO_MSG_APRS_DECODE,
|
||||
DecodedMessage::Cw(_) => AUDIO_MSG_CW_DECODE,
|
||||
DecodedMessage::Ft8(_) => AUDIO_MSG_FT8_DECODE,
|
||||
|
||||
@@ -245,6 +245,7 @@ fn default_audio_bandwidth_for_mode(mode: &trx_core::rig::state::RigMode) -> u32
|
||||
RigMode::FM => 12_500,
|
||||
RigMode::WFM => 180_000,
|
||||
RigMode::AIS => 25_000,
|
||||
RigMode::VDES => 25_000,
|
||||
RigMode::Other(_) => 3_000,
|
||||
}
|
||||
}
|
||||
@@ -266,6 +267,7 @@ fn parse_rig_mode(
|
||||
"WFM" => RigMode::WFM,
|
||||
"FM" => RigMode::FM,
|
||||
"AIS" => RigMode::AIS,
|
||||
"VDES" => RigMode::VDES,
|
||||
"DIG" => RigMode::DIG,
|
||||
"PKT" => RigMode::PKT,
|
||||
_ => initial_mode.clone(),
|
||||
@@ -613,6 +615,8 @@ fn spawn_rig_audio_stack(
|
||||
}));
|
||||
|
||||
if let Some((ais_a_pcm_rx, ais_b_pcm_rx)) = sdr_ais_pcm_rx {
|
||||
let vdes_a_pcm_rx = ais_a_pcm_rx.resubscribe();
|
||||
let vdes_b_pcm_rx = ais_b_pcm_rx.resubscribe();
|
||||
let ais_state_rx = state_rx.clone();
|
||||
let ais_decode_tx = decode_tx.clone();
|
||||
let ais_shutdown_rx = shutdown_rx.clone();
|
||||
@@ -625,6 +629,19 @@ fn spawn_rig_audio_stack(
|
||||
_ = wait_for_shutdown(ais_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
|
||||
let vdes_state_rx = state_rx.clone();
|
||||
let vdes_decode_tx = decode_tx.clone();
|
||||
let vdes_shutdown_rx = shutdown_rx.clone();
|
||||
let vdes_histories = histories.clone();
|
||||
let vdes_sr = rig_cfg.audio.sample_rate;
|
||||
let vdes_ch = rig_cfg.audio.channels as u16;
|
||||
handles.push(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = audio::run_vdes_decoder(vdes_sr, vdes_ch, vdes_a_pcm_rx, vdes_b_pcm_rx, vdes_state_rx, vdes_decode_tx, vdes_histories) => {}
|
||||
_ = wait_for_shutdown(vdes_shutdown_rx) => {}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
// Spawn CW decoder task (no histories needed — CW has no persistent history)
|
||||
|
||||
@@ -773,7 +773,7 @@ fn map_signal_strength(mode: &RigMode, raw: u8) -> i32 {
|
||||
// FT-817 returns 0-15 for signal strength
|
||||
// Map to approximate dBm / S-units
|
||||
match mode {
|
||||
RigMode::FM | RigMode::WFM | RigMode::AIS => -120 + (raw as i32 * 6),
|
||||
RigMode::FM | RigMode::WFM | RigMode::AIS | RigMode::VDES => -120 + (raw as i32 * 6),
|
||||
_ => -127 + (raw as i32 * 6),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,6 +94,7 @@ impl DummyRig {
|
||||
RigMode::FM,
|
||||
RigMode::WFM,
|
||||
RigMode::AIS,
|
||||
RigMode::VDES,
|
||||
RigMode::DIG,
|
||||
RigMode::PKT,
|
||||
],
|
||||
|
||||
@@ -165,6 +165,7 @@ impl Ft450d {
|
||||
RigMode::WFM,
|
||||
RigMode::FM,
|
||||
RigMode::AIS,
|
||||
RigMode::VDES,
|
||||
RigMode::DIG,
|
||||
RigMode::PKT,
|
||||
],
|
||||
@@ -512,7 +513,7 @@ fn encode_mode(mode: &RigMode) -> DynResult<char> {
|
||||
RigMode::USB => Ok('2'),
|
||||
RigMode::CW => Ok('3'),
|
||||
RigMode::FM => Ok('4'),
|
||||
RigMode::AIS => Ok('4'),
|
||||
RigMode::AIS | RigMode::VDES => Ok('4'),
|
||||
RigMode::AM => Ok('5'),
|
||||
RigMode::DIG => Ok('6'),
|
||||
RigMode::CWR => Ok('7'),
|
||||
|
||||
@@ -196,6 +196,7 @@ impl Ft817 {
|
||||
RigMode::WFM,
|
||||
RigMode::FM,
|
||||
RigMode::AIS,
|
||||
RigMode::VDES,
|
||||
RigMode::DIG,
|
||||
RigMode::PKT,
|
||||
],
|
||||
@@ -589,7 +590,7 @@ fn encode_mode(mode: &RigMode) -> u8 {
|
||||
RigMode::AM => 0x04,
|
||||
RigMode::WFM => 0x06,
|
||||
RigMode::FM => 0x08,
|
||||
RigMode::AIS => 0x08,
|
||||
RigMode::AIS | RigMode::VDES => 0x08,
|
||||
RigMode::DIG => 0x0A,
|
||||
RigMode::PKT => 0x0C,
|
||||
RigMode::Other(_) => 0x00,
|
||||
|
||||
@@ -156,7 +156,7 @@ impl Demodulator {
|
||||
RigMode::AM => Self::Am,
|
||||
RigMode::FM => Self::Fm,
|
||||
RigMode::WFM => Self::Wfm,
|
||||
RigMode::AIS => Self::Fm,
|
||||
RigMode::AIS | RigMode::VDES => Self::Fm,
|
||||
RigMode::CW | RigMode::CWR => Self::Cw,
|
||||
RigMode::DIG => Self::Passthrough,
|
||||
// VHF/UHF packet radio (APRS, AX.25) is FM-encoded AFSK.
|
||||
|
||||
@@ -44,7 +44,7 @@ fn default_bandwidth_for_mode(mode: &RigMode) -> u32 {
|
||||
RigMode::AM => 9_000,
|
||||
RigMode::FM => 12_500,
|
||||
RigMode::WFM => 180_000,
|
||||
RigMode::AIS => 25_000,
|
||||
RigMode::AIS | RigMode::VDES => 25_000,
|
||||
RigMode::Other(_) => 3_000,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ impl SoapySdrRig {
|
||||
fn default_bandwidth_for_mode(mode: &RigMode) -> u32 {
|
||||
match mode {
|
||||
RigMode::LSB | RigMode::USB | RigMode::DIG => 3_000,
|
||||
RigMode::PKT | RigMode::AIS => 25_000,
|
||||
RigMode::PKT | RigMode::AIS | RigMode::VDES => 25_000,
|
||||
RigMode::CW | RigMode::CWR => 500,
|
||||
RigMode::AM => 9_000,
|
||||
RigMode::FM => 12_500,
|
||||
@@ -192,6 +192,7 @@ impl SoapySdrRig {
|
||||
RigMode::WFM,
|
||||
RigMode::FM,
|
||||
RigMode::AIS,
|
||||
RigMode::VDES,
|
||||
RigMode::DIG,
|
||||
RigMode::PKT,
|
||||
],
|
||||
@@ -352,7 +353,7 @@ impl RigCat for SoapySdrRig {
|
||||
let half_span_hz = i128::from(self.pipeline.sdr_sample_rate) / 2;
|
||||
let current_center_hz = i128::from(self.center_hz);
|
||||
let target_lo_hz = i128::from(freq.hz);
|
||||
let target_hi_hz = if self.mode == RigMode::AIS {
|
||||
let target_hi_hz = if matches!(self.mode, RigMode::AIS | RigMode::VDES) {
|
||||
i128::from(freq.hz) + i128::from(AIS_CHANNEL_SPACING_HZ)
|
||||
} else {
|
||||
i128::from(freq.hz)
|
||||
|
||||
Reference in New Issue
Block a user