[feat](trx-core): add WSPR decode plumbing across stack

Introduce WSPR command/state/protocol/transport wiring and integrate lifecycle, history, and frontend API paths mirroring the FT8 architecture.

Co-authored-by: OpenAI Codex <codex@openai.com>
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-02-12 22:38:05 +01:00
parent 8e59e205a8
commit 63ba6882cd
16 changed files with 310 additions and 5 deletions
+127 -1
View File
@@ -17,8 +17,9 @@ use tracing::{error, info, warn};
use trx_core::audio::{
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE,
AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME,
AUDIO_MSG_WSPR_DECODE,
};
use trx_core::decode::{AprsPacket, DecodedMessage, Ft8Message};
use trx_core::decode::{AprsPacket, DecodedMessage, Ft8Message, WsprMessage};
use trx_core::rig::state::{RigMode, RigState};
use trx_ft8::Ft8Decoder;
@@ -27,6 +28,7 @@ use crate::decode;
const APRS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const FT8_SAMPLE_RATE: u32 = 12_000;
fn aprs_history() -> &'static Mutex<VecDeque<(Instant, AprsPacket)>> {
@@ -95,6 +97,33 @@ pub fn clear_ft8_history() {
history.clear();
}
fn wspr_history() -> &'static Mutex<VecDeque<(Instant, WsprMessage)>> {
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, WsprMessage)>>> = OnceLock::new();
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
}
fn prune_wspr_history(history: &mut VecDeque<(Instant, WsprMessage)>) {
let cutoff = Instant::now() - WSPR_HISTORY_RETENTION;
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
}
pub fn snapshot_wspr_history() -> Vec<WsprMessage> {
let mut history = wspr_history().lock().expect("wspr history mutex poisoned");
prune_wspr_history(&mut history);
history.iter().map(|(_, msg)| msg.clone()).collect()
}
pub fn clear_wspr_history() {
let mut history = wspr_history().lock().expect("wspr history mutex poisoned");
history.clear();
}
/// Spawn the audio capture thread.
///
/// Opens the configured input device via cpal, accumulates PCM samples into
@@ -753,6 +782,93 @@ pub async fn run_ft8_decoder(
}
}
/// Run the WSPR decoder task. Mirrors FT8 lifecycle/slot behavior.
///
/// Note: decoding engine integration is intentionally staged; this task already
/// participates in enable/disable/reset flow and transport plumbing.
pub async fn run_wspr_decoder(
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
mut state_rx: watch::Receiver<RigState>,
_decode_tx: broadcast::Sender<DecodedMessage>,
) {
info!("WSPR decoder started ({}Hz, {} ch)", sample_rate, channels);
let mut last_reset_seq: u64 = 0;
let mut active = state_rx.borrow().wspr_decode_enabled
&& matches!(state_rx.borrow().status.mode, RigMode::DIG | RigMode::USB);
let mut warned_no_decoder = false;
loop {
if !active {
match state_rx.changed().await {
Ok(()) => {
let state = state_rx.borrow();
active = state.wspr_decode_enabled
&& matches!(state.status.mode, RigMode::DIG | RigMode::USB);
if active {
pcm_rx = pcm_rx.resubscribe();
}
if state.wspr_decode_reset_seq != last_reset_seq {
last_reset_seq = state.wspr_decode_reset_seq;
}
warned_no_decoder = false;
}
Err(_) => break,
}
continue;
}
tokio::select! {
recv = pcm_rx.recv() => {
match recv {
Ok(frame) => {
let state = state_rx.borrow();
if state.wspr_decode_reset_seq != last_reset_seq {
last_reset_seq = state.wspr_decode_reset_seq;
}
// Keep the same preprocessing path as FT8 so decoder integration
// can be dropped in without changing task flow.
let mono = downmix_mono(frame, channels);
if resample_to_12k(&mono, sample_rate).is_none() {
warn!("WSPR decoder: unsupported sample rate {}", sample_rate);
break;
}
if !warned_no_decoder {
warn!("WSPR decoder engine not integrated yet; decode output is inactive");
warned_no_decoder = true;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("WSPR decoder: dropped {} PCM frames", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
changed = state_rx.changed() => {
match changed {
Ok(()) => {
let state = state_rx.borrow();
active = state.wspr_decode_enabled
&& matches!(state.status.mode, RigMode::DIG | RigMode::USB);
if state.wspr_decode_reset_seq != last_reset_seq {
last_reset_seq = state.wspr_decode_reset_seq;
}
if active {
pcm_rx = pcm_rx.resubscribe();
} else {
warned_no_decoder = false;
}
}
Err(_) => break,
}
}
}
}
}
/// Run the audio TCP listener, accepting client connections.
pub async fn run_audio_listener(
addr: SocketAddr,
@@ -834,6 +950,15 @@ async fn handle_audio_client(
write_audio_msg(&mut writer, msg_type, &json).await?;
}
}
// Send WSPR history to newly connected client.
let history = snapshot_wspr_history();
for msg in history {
let msg = DecodedMessage::Wspr(msg);
let msg_type = AUDIO_MSG_WSPR_DECODE;
if let Ok(json) = serde_json::to_vec(&msg) {
write_audio_msg(&mut writer, msg_type, &json).await?;
}
}
// Spawn RX + decode forwarding task (shares the writer)
let mut rx_sub = rx_audio.subscribe();
@@ -863,6 +988,7 @@ async fn handle_audio_client(
DecodedMessage::Aprs(_) => AUDIO_MSG_APRS_DECODE,
DecodedMessage::Cw(_) => AUDIO_MSG_CW_DECODE,
DecodedMessage::Ft8(_) => AUDIO_MSG_FT8_DECODE,
DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE,
};
if let Ok(json) = serde_json::to_vec(&msg) {
if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await {
+14
View File
@@ -391,6 +391,20 @@ async fn main() -> DynResult<()> {
_ = wait_for_shutdown(ft8_shutdown_rx) => {}
}
}));
// Spawn WSPR decoder task
let wspr_pcm_rx = pcm_tx.subscribe();
let wspr_state_rx = _state_rx.clone();
let wspr_decode_tx = decode_tx.clone();
let wspr_sr = cfg.audio.sample_rate;
let wspr_ch = cfg.audio.channels;
let wspr_shutdown_rx = shutdown_rx.clone();
task_handles.push(tokio::spawn(async move {
tokio::select! {
_ = audio::run_wspr_decoder(wspr_sr, wspr_ch as u16, wspr_pcm_rx, wspr_state_rx, wspr_decode_tx) => {}
_ = wait_for_shutdown(wspr_shutdown_rx) => {}
}
}));
}
if cfg.audio.tx_enabled {
let _playback_thread = audio::spawn_audio_playback(&cfg.audio, tx_audio_rx);
+12
View File
@@ -368,6 +368,12 @@ async fn process_command(
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::SetWsprDecodeEnabled(en) => {
ctx.state.wspr_decode_enabled = en;
info!("WSPR decode {}", if en { "enabled" } else { "disabled" });
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::ResetAprsDecoder => {
audio::clear_aprs_history();
ctx.state.aprs_decode_reset_seq += 1;
@@ -385,6 +391,12 @@ async fn process_command(
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::ResetWsprDecoder => {
audio::clear_wspr_history();
ctx.state.wspr_decode_reset_seq += 1;
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
_ => {} // fall through to normal rig handler
}