From 63ba6882cd1f921d6c1ccf041a0461f6b3e88f45 Mon Sep 17 00:00:00 2001 From: Stanislaw Grams Date: Thu, 12 Feb 2026 22:38:05 +0100 Subject: [PATCH] [feat](trx-core): add WSPR decode plumbing across stack Introduce WSPR command/state/protocol/transport wiring and integrate lifecycle, history, and frontend API paths mirroring the FT8 architecture. Co-authored-by: OpenAI Codex Signed-off-by: Stanislaw Grams --- src/trx-client/src/audio_client.rs | 6 +- src/trx-client/src/remote_client.rs | 1 + src/trx-client/trx-frontend/src/lib.rs | 5 +- .../trx-frontend-http-json/src/server.rs | 1 + .../trx-frontend/trx-frontend-http/src/api.rs | 26 ++++ .../trx-frontend-http/src/audio.rs | 38 +++++- src/trx-core/src/audio.rs | 1 + src/trx-core/src/decode.rs | 16 +++ src/trx-core/src/rig/command.rs | 2 + src/trx-core/src/rig/controller/handlers.rs | 4 +- src/trx-core/src/rig/state.rs | 11 ++ src/trx-protocol/src/mapping.rs | 48 +++++++ src/trx-protocol/src/types.rs | 2 + src/trx-server/src/audio.rs | 128 +++++++++++++++++- src/trx-server/src/main.rs | 14 ++ src/trx-server/src/rig_task.rs | 12 ++ 16 files changed, 310 insertions(+), 5 deletions(-) diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index 36ce34c..ae5268a 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -17,6 +17,7 @@ use tracing::{info, warn}; use trx_core::audio::{ read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, + AUDIO_MSG_WSPR_DECODE, }; use trx_core::decode::DecodedMessage; @@ -115,7 +116,10 @@ async fn handle_audio_connection( let _ = rx_tx.send(Bytes::from(payload)); } Ok(( - AUDIO_MSG_APRS_DECODE | AUDIO_MSG_CW_DECODE | AUDIO_MSG_FT8_DECODE, + AUDIO_MSG_APRS_DECODE + | AUDIO_MSG_CW_DECODE + | AUDIO_MSG_FT8_DECODE + | AUDIO_MSG_WSPR_DECODE, payload, )) => { if let Ok(msg) = serde_json::from_slice::(&payload) { diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index 30ef3ed..584c057 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -414,6 +414,7 @@ mod tests { aprs_decode_enabled: false, cw_decode_enabled: false, ft8_decode_enabled: false, + wspr_decode_enabled: false, cw_auto: true, cw_wpm: 15, cw_tone_hz: 700, diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index d6178aa..3936d40 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -13,7 +13,7 @@ use tokio::sync::{broadcast, mpsc, watch}; use tokio::task::JoinHandle; use trx_core::audio::AudioStreamInfo; -use trx_core::decode::{AprsPacket, CwEvent, DecodedMessage, Ft8Message}; +use trx_core::decode::{AprsPacket, CwEvent, DecodedMessage, Ft8Message, WsprMessage}; use trx_core::{DynResult, RigRequest, RigState}; /// Trait implemented by concrete frontends to expose a runner entrypoint. @@ -116,6 +116,8 @@ pub struct FrontendRuntimeContext { pub cw_history: Arc>>, /// FT8 decode history (timestamp, message) pub ft8_history: Arc>>, + /// WSPR decode history (timestamp, message) + pub wspr_history: Arc>>, /// Authentication tokens for HTTP-JSON frontend pub auth_tokens: HashSet, /// Guard to avoid spawning duplicate decode collectors. @@ -133,6 +135,7 @@ impl FrontendRuntimeContext { aprs_history: Arc::new(Mutex::new(VecDeque::new())), cw_history: Arc::new(Mutex::new(VecDeque::new())), ft8_history: Arc::new(Mutex::new(VecDeque::new())), + wspr_history: Arc::new(Mutex::new(VecDeque::new())), auth_tokens: HashSet::new(), decode_collector_started: AtomicBool::new(false), } diff --git a/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs b/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs index 234a824..d2cb0d5 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs @@ -337,6 +337,7 @@ mod tests { aprs_decode_enabled: false, cw_decode_enabled: false, ft8_decode_enabled: false, + wspr_decode_enabled: false, cw_auto: true, cw_wpm: 15, cw_tone_hz: 700, diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 56e1ab7..1042f71 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -124,6 +124,11 @@ pub async fn decode_events( .into_iter() .map(trx_core::decode::DecodedMessage::Ft8), ); + out.extend( + crate::server::audio::snapshot_wspr_history(context.get_ref()) + .into_iter() + .map(trx_core::decode::DecodedMessage::Wspr), + ); out }; @@ -360,6 +365,15 @@ pub async fn toggle_ft8_decode( send_command(&rig_tx, RigCommand::SetFt8DecodeEnabled(!enabled)).await } +#[post("/toggle_wspr_decode")] +pub async fn toggle_wspr_decode( + state: web::Data>, + rig_tx: web::Data>, +) -> Result { + let enabled = state.get_ref().borrow().wspr_decode_enabled; + send_command(&rig_tx, RigCommand::SetWsprDecodeEnabled(!enabled)).await +} + #[post("/clear_ft8_decode")] pub async fn clear_ft8_decode( context: web::Data>, @@ -369,6 +383,15 @@ pub async fn clear_ft8_decode( send_command(&rig_tx, RigCommand::ResetFt8Decoder).await } +#[post("/clear_wspr_decode")] +pub async fn clear_wspr_decode( + context: web::Data>, + rig_tx: web::Data>, +) -> Result { + crate::server::audio::clear_wspr_history(context.get_ref()); + send_command(&rig_tx, RigCommand::ResetWsprDecoder).await +} + #[post("/clear_aprs_decode")] pub async fn clear_aprs_decode( context: web::Data>, @@ -406,9 +429,11 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(set_cw_wpm) .service(set_cw_tone) .service(toggle_ft8_decode) + .service(toggle_wspr_decode) .service(clear_aprs_decode) .service(clear_cw_decode) .service(clear_ft8_decode) + .service(clear_wspr_decode) .service(crate::server::audio::audio_ws) .service(favicon) .service(logo) @@ -558,6 +583,7 @@ async fn wait_for_view(mut rx: watch::Receiver) -> Result) { } } +fn prune_wspr_history(history: &mut VecDeque<(Instant, WsprMessage)>) { + while let Some((ts, _)) = history.front() { + if ts.elapsed() <= HISTORY_RETENTION { + break; + } + history.pop_front(); + } +} + fn record_aprs(context: &FrontendRuntimeContext, pkt: AprsPacket) { let mut history = context .aprs_history @@ -79,6 +88,15 @@ fn record_ft8(context: &FrontendRuntimeContext, msg: Ft8Message) { prune_ft8_history(&mut history); } +fn record_wspr(context: &FrontendRuntimeContext, msg: WsprMessage) { + let mut history = context + .wspr_history + .lock() + .expect("wspr history mutex poisoned"); + history.push_back((Instant::now(), msg)); + prune_wspr_history(&mut history); +} + pub fn snapshot_aprs_history(context: &FrontendRuntimeContext) -> Vec { let mut history = context .aprs_history @@ -106,6 +124,15 @@ pub fn snapshot_ft8_history(context: &FrontendRuntimeContext) -> Vec history.iter().map(|(_, msg)| msg.clone()).collect() } +pub fn snapshot_wspr_history(context: &FrontendRuntimeContext) -> Vec { + let mut history = context + .wspr_history + .lock() + .expect("wspr history mutex poisoned"); + prune_wspr_history(&mut history); + history.iter().map(|(_, msg)| msg.clone()).collect() +} + pub fn clear_aprs_history(context: &FrontendRuntimeContext) { let mut history = context .aprs_history @@ -130,6 +157,14 @@ pub fn clear_ft8_history(context: &FrontendRuntimeContext) { history.clear(); } +pub fn clear_wspr_history(context: &FrontendRuntimeContext) { + let mut history = context + .wspr_history + .lock() + .expect("wspr history mutex poisoned"); + history.clear(); +} + pub fn subscribe_decode( context: &FrontendRuntimeContext, ) -> Option> { @@ -156,6 +191,7 @@ pub fn start_decode_history_collector(context: Arc) { DecodedMessage::Aprs(pkt) => record_aprs(&context, pkt), DecodedMessage::Cw(evt) => record_cw(&context, evt), DecodedMessage::Ft8(msg) => record_ft8(&context, msg), + DecodedMessage::Wspr(msg) => record_wspr(&context, msg), }, Err(broadcast::error::RecvError::Lagged(_)) => continue, Err(broadcast::error::RecvError::Closed) => break, diff --git a/src/trx-core/src/audio.rs b/src/trx-core/src/audio.rs index 901a0f5..ccaa8af 100644 --- a/src/trx-core/src/audio.rs +++ b/src/trx-core/src/audio.rs @@ -14,6 +14,7 @@ pub const AUDIO_MSG_TX_FRAME: u8 = 0x02; pub const AUDIO_MSG_APRS_DECODE: u8 = 0x03; pub const AUDIO_MSG_CW_DECODE: u8 = 0x04; pub const AUDIO_MSG_FT8_DECODE: u8 = 0x05; +pub const AUDIO_MSG_WSPR_DECODE: u8 = 0x06; /// Maximum payload size (1 MB) to reject bogus frames early. const MAX_PAYLOAD_SIZE: u32 = 1_048_576; diff --git a/src/trx-core/src/decode.rs b/src/trx-core/src/decode.rs index ca51301..30caa64 100644 --- a/src/trx-core/src/decode.rs +++ b/src/trx-core/src/decode.rs @@ -16,6 +16,8 @@ pub enum DecodedMessage { Cw(CwEvent), #[serde(rename = "ft8")] Ft8(Ft8Message), + #[serde(rename = "wspr")] + Wspr(WsprMessage), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -62,3 +64,17 @@ pub struct Ft8Message { /// Decoded message text pub message: String, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WsprMessage { + /// UTC timestamp (milliseconds since epoch) + pub ts_ms: i64, + /// Approximate SNR (dB) + pub snr_db: f32, + /// Time offset within slot (seconds) + pub dt_s: f32, + /// Audio frequency (Hz) + pub freq_hz: f32, + /// Decoded message text + pub message: String, +} diff --git a/src/trx-core/src/rig/command.rs b/src/trx-core/src/rig/command.rs index 3e750a2..06d776e 100644 --- a/src/trx-core/src/rig/command.rs +++ b/src/trx-core/src/rig/command.rs @@ -25,7 +25,9 @@ pub enum RigCommand { SetCwWpm(u32), SetCwToneHz(u32), SetFt8DecodeEnabled(bool), + SetWsprDecodeEnabled(bool), ResetAprsDecoder, ResetCwDecoder, ResetFt8Decoder, + ResetWsprDecoder, } diff --git a/src/trx-core/src/rig/controller/handlers.rs b/src/trx-core/src/rig/controller/handlers.rs index 0f59168..f28fd36 100644 --- a/src/trx-core/src/rig/controller/handlers.rs +++ b/src/trx-core/src/rig/controller/handlers.rs @@ -508,9 +508,11 @@ pub fn command_from_rig_command(cmd: RigCommand) -> Box { | RigCommand::SetCwWpm(_) | RigCommand::SetCwToneHz(_) | RigCommand::SetFt8DecodeEnabled(_) + | RigCommand::SetWsprDecodeEnabled(_) | RigCommand::ResetAprsDecoder | RigCommand::ResetCwDecoder - | RigCommand::ResetFt8Decoder => Box::new(GetSnapshotCommand), + | RigCommand::ResetFt8Decoder + | RigCommand::ResetWsprDecoder => Box::new(GetSnapshotCommand), } } diff --git a/src/trx-core/src/rig/state.rs b/src/trx-core/src/rig/state.rs index 361bdd0..ec55767 100644 --- a/src/trx-core/src/rig/state.rs +++ b/src/trx-core/src/rig/state.rs @@ -31,6 +31,8 @@ pub struct RigState { #[serde(default)] pub ft8_decode_enabled: bool, #[serde(default)] + pub wspr_decode_enabled: bool, + #[serde(default)] pub cw_auto: bool, #[serde(default)] pub cw_wpm: u32, @@ -42,6 +44,8 @@ pub struct RigState { pub cw_decode_reset_seq: u64, #[serde(default, skip_serializing)] pub ft8_decode_reset_seq: u64, + #[serde(default, skip_serializing)] + pub wspr_decode_reset_seq: u64, } /// Mode supported by the rig. @@ -113,12 +117,14 @@ impl RigState { aprs_decode_enabled: false, cw_decode_enabled: false, ft8_decode_enabled: false, + wspr_decode_enabled: false, cw_auto: true, cw_wpm: 15, cw_tone_hz: 700, aprs_decode_reset_seq: 0, cw_decode_reset_seq: 0, ft8_decode_reset_seq: 0, + wspr_decode_reset_seq: 0, } } @@ -169,9 +175,11 @@ impl RigState { cw_wpm: snapshot.cw_wpm, cw_tone_hz: snapshot.cw_tone_hz, ft8_decode_enabled: snapshot.ft8_decode_enabled, + wspr_decode_enabled: snapshot.wspr_decode_enabled, aprs_decode_reset_seq: 0, cw_decode_reset_seq: 0, ft8_decode_reset_seq: 0, + wspr_decode_reset_seq: 0, } } @@ -202,6 +210,7 @@ impl RigState { cw_wpm: self.cw_wpm, cw_tone_hz: self.cw_tone_hz, ft8_decode_enabled: self.ft8_decode_enabled, + wspr_decode_enabled: self.wspr_decode_enabled, }) } @@ -251,6 +260,8 @@ pub struct RigSnapshot { #[serde(default)] pub ft8_decode_enabled: bool, #[serde(default)] + pub wspr_decode_enabled: bool, + #[serde(default)] pub cw_auto: bool, #[serde(default)] pub cw_wpm: u32, diff --git a/src/trx-protocol/src/mapping.rs b/src/trx-protocol/src/mapping.rs index 615f34b..9dd110a 100644 --- a/src/trx-protocol/src/mapping.rs +++ b/src/trx-protocol/src/mapping.rs @@ -35,9 +35,13 @@ pub fn client_command_to_rig(cmd: ClientCommand) -> RigCommand { ClientCommand::SetCwWpm { wpm } => RigCommand::SetCwWpm(wpm), ClientCommand::SetCwToneHz { tone_hz } => RigCommand::SetCwToneHz(tone_hz), ClientCommand::SetFt8DecodeEnabled { enabled } => RigCommand::SetFt8DecodeEnabled(enabled), + ClientCommand::SetWsprDecodeEnabled { enabled } => { + RigCommand::SetWsprDecodeEnabled(enabled) + } ClientCommand::ResetAprsDecoder => RigCommand::ResetAprsDecoder, ClientCommand::ResetCwDecoder => RigCommand::ResetCwDecoder, ClientCommand::ResetFt8Decoder => RigCommand::ResetFt8Decoder, + ClientCommand::ResetWsprDecoder => RigCommand::ResetWsprDecoder, } } @@ -68,9 +72,13 @@ pub fn rig_command_to_client(cmd: RigCommand) -> ClientCommand { RigCommand::SetCwWpm(wpm) => ClientCommand::SetCwWpm { wpm }, RigCommand::SetCwToneHz(tone_hz) => ClientCommand::SetCwToneHz { tone_hz }, RigCommand::SetFt8DecodeEnabled(enabled) => ClientCommand::SetFt8DecodeEnabled { enabled }, + RigCommand::SetWsprDecodeEnabled(enabled) => { + ClientCommand::SetWsprDecodeEnabled { enabled } + } RigCommand::ResetAprsDecoder => ClientCommand::ResetAprsDecoder, RigCommand::ResetCwDecoder => ClientCommand::ResetCwDecoder, RigCommand::ResetFt8Decoder => ClientCommand::ResetFt8Decoder, + RigCommand::ResetWsprDecoder => ClientCommand::ResetWsprDecoder, } } @@ -263,6 +271,16 @@ mod tests { } } + #[test] + fn test_client_command_to_rig_set_wspr_decode_enabled() { + let cmd = ClientCommand::SetWsprDecodeEnabled { enabled: true }; + if let RigCommand::SetWsprDecodeEnabled(enabled) = client_command_to_rig(cmd) { + assert!(enabled); + } else { + panic!("Expected SetWsprDecodeEnabled"); + } + } + #[test] fn test_client_command_to_rig_reset_aprs_decoder() { let cmd = ClientCommand::ResetAprsDecoder; @@ -293,6 +311,16 @@ mod tests { } } + #[test] + fn test_client_command_to_rig_reset_wspr_decoder() { + let cmd = ClientCommand::ResetWsprDecoder; + if let RigCommand::ResetWsprDecoder = client_command_to_rig(cmd) { + // Success + } else { + panic!("Expected ResetWsprDecoder"); + } + } + #[test] fn test_rig_command_to_client_get_snapshot() { let cmd = RigCommand::GetSnapshot; @@ -473,6 +501,16 @@ mod tests { } } + #[test] + fn test_rig_command_to_client_set_wspr_decode_enabled() { + let cmd = RigCommand::SetWsprDecodeEnabled(true); + if let ClientCommand::SetWsprDecodeEnabled { enabled } = rig_command_to_client(cmd) { + assert!(enabled); + } else { + panic!("Expected SetWsprDecodeEnabled"); + } + } + #[test] fn test_rig_command_to_client_reset_aprs_decoder() { let cmd = RigCommand::ResetAprsDecoder; @@ -503,6 +541,16 @@ mod tests { } } + #[test] + fn test_rig_command_to_client_reset_wspr_decoder() { + let cmd = RigCommand::ResetWsprDecoder; + if let ClientCommand::ResetWsprDecoder = rig_command_to_client(cmd) { + // Success + } else { + panic!("Expected ResetWsprDecoder"); + } + } + #[test] fn test_round_trip_set_freq() { let original = ClientCommand::SetFreq { freq_hz: 7050000 }; diff --git a/src/trx-protocol/src/types.rs b/src/trx-protocol/src/types.rs index 11d8bbc..922201c 100644 --- a/src/trx-protocol/src/types.rs +++ b/src/trx-protocol/src/types.rs @@ -29,9 +29,11 @@ pub enum ClientCommand { SetCwWpm { wpm: u32 }, SetCwToneHz { tone_hz: u32 }, SetFt8DecodeEnabled { enabled: bool }, + SetWsprDecodeEnabled { enabled: bool }, ResetAprsDecoder, ResetCwDecoder, ResetFt8Decoder, + ResetWsprDecoder, } /// Envelope for client commands with optional authentication token. diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 139bfbe..72110d1 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -17,8 +17,9 @@ use tracing::{error, info, warn}; use trx_core::audio::{ read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, + AUDIO_MSG_WSPR_DECODE, }; -use trx_core::decode::{AprsPacket, DecodedMessage, Ft8Message}; +use trx_core::decode::{AprsPacket, DecodedMessage, Ft8Message, WsprMessage}; use trx_core::rig::state::{RigMode, RigState}; use trx_ft8::Ft8Decoder; @@ -27,6 +28,7 @@ use crate::decode; const APRS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); +const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const FT8_SAMPLE_RATE: u32 = 12_000; fn aprs_history() -> &'static Mutex> { @@ -95,6 +97,33 @@ pub fn clear_ft8_history() { history.clear(); } +fn wspr_history() -> &'static Mutex> { + static HISTORY: OnceLock>> = OnceLock::new(); + HISTORY.get_or_init(|| Mutex::new(VecDeque::new())) +} + +fn prune_wspr_history(history: &mut VecDeque<(Instant, WsprMessage)>) { + let cutoff = Instant::now() - WSPR_HISTORY_RETENTION; + while let Some((ts, _)) = history.front() { + if *ts < cutoff { + history.pop_front(); + } else { + break; + } + } +} + +pub fn snapshot_wspr_history() -> Vec { + let mut history = wspr_history().lock().expect("wspr history mutex poisoned"); + prune_wspr_history(&mut history); + history.iter().map(|(_, msg)| msg.clone()).collect() +} + +pub fn clear_wspr_history() { + let mut history = wspr_history().lock().expect("wspr history mutex poisoned"); + history.clear(); +} + /// Spawn the audio capture thread. /// /// Opens the configured input device via cpal, accumulates PCM samples into @@ -753,6 +782,93 @@ pub async fn run_ft8_decoder( } } +/// Run the WSPR decoder task. Mirrors FT8 lifecycle/slot behavior. +/// +/// Note: decoding engine integration is intentionally staged; this task already +/// participates in enable/disable/reset flow and transport plumbing. +pub async fn run_wspr_decoder( + sample_rate: u32, + channels: u16, + mut pcm_rx: broadcast::Receiver>, + mut state_rx: watch::Receiver, + _decode_tx: broadcast::Sender, +) { + info!("WSPR decoder started ({}Hz, {} ch)", sample_rate, channels); + let mut last_reset_seq: u64 = 0; + let mut active = state_rx.borrow().wspr_decode_enabled + && matches!(state_rx.borrow().status.mode, RigMode::DIG | RigMode::USB); + let mut warned_no_decoder = false; + + loop { + if !active { + match state_rx.changed().await { + Ok(()) => { + let state = state_rx.borrow(); + active = state.wspr_decode_enabled + && matches!(state.status.mode, RigMode::DIG | RigMode::USB); + if active { + pcm_rx = pcm_rx.resubscribe(); + } + if state.wspr_decode_reset_seq != last_reset_seq { + last_reset_seq = state.wspr_decode_reset_seq; + } + warned_no_decoder = false; + } + Err(_) => break, + } + continue; + } + + tokio::select! { + recv = pcm_rx.recv() => { + match recv { + Ok(frame) => { + let state = state_rx.borrow(); + if state.wspr_decode_reset_seq != last_reset_seq { + last_reset_seq = state.wspr_decode_reset_seq; + } + + // Keep the same preprocessing path as FT8 so decoder integration + // can be dropped in without changing task flow. + let mono = downmix_mono(frame, channels); + if resample_to_12k(&mono, sample_rate).is_none() { + warn!("WSPR decoder: unsupported sample rate {}", sample_rate); + break; + } + + if !warned_no_decoder { + warn!("WSPR decoder engine not integrated yet; decode output is inactive"); + warned_no_decoder = true; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("WSPR decoder: dropped {} PCM frames", n); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } + changed = state_rx.changed() => { + match changed { + Ok(()) => { + let state = state_rx.borrow(); + active = state.wspr_decode_enabled + && matches!(state.status.mode, RigMode::DIG | RigMode::USB); + if state.wspr_decode_reset_seq != last_reset_seq { + last_reset_seq = state.wspr_decode_reset_seq; + } + if active { + pcm_rx = pcm_rx.resubscribe(); + } else { + warned_no_decoder = false; + } + } + Err(_) => break, + } + } + } + } +} + /// Run the audio TCP listener, accepting client connections. pub async fn run_audio_listener( addr: SocketAddr, @@ -834,6 +950,15 @@ async fn handle_audio_client( write_audio_msg(&mut writer, msg_type, &json).await?; } } + // Send WSPR history to newly connected client. + let history = snapshot_wspr_history(); + for msg in history { + let msg = DecodedMessage::Wspr(msg); + let msg_type = AUDIO_MSG_WSPR_DECODE; + if let Ok(json) = serde_json::to_vec(&msg) { + write_audio_msg(&mut writer, msg_type, &json).await?; + } + } // Spawn RX + decode forwarding task (shares the writer) let mut rx_sub = rx_audio.subscribe(); @@ -863,6 +988,7 @@ async fn handle_audio_client( DecodedMessage::Aprs(_) => AUDIO_MSG_APRS_DECODE, DecodedMessage::Cw(_) => AUDIO_MSG_CW_DECODE, DecodedMessage::Ft8(_) => AUDIO_MSG_FT8_DECODE, + DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE, }; if let Ok(json) = serde_json::to_vec(&msg) { if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await { diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 912b1c6..75b1db3 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -391,6 +391,20 @@ async fn main() -> DynResult<()> { _ = wait_for_shutdown(ft8_shutdown_rx) => {} } })); + + // Spawn WSPR decoder task + let wspr_pcm_rx = pcm_tx.subscribe(); + let wspr_state_rx = _state_rx.clone(); + let wspr_decode_tx = decode_tx.clone(); + let wspr_sr = cfg.audio.sample_rate; + let wspr_ch = cfg.audio.channels; + let wspr_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(async move { + tokio::select! { + _ = audio::run_wspr_decoder(wspr_sr, wspr_ch as u16, wspr_pcm_rx, wspr_state_rx, wspr_decode_tx) => {} + _ = wait_for_shutdown(wspr_shutdown_rx) => {} + } + })); } if cfg.audio.tx_enabled { let _playback_thread = audio::spawn_audio_playback(&cfg.audio, tx_audio_rx); diff --git a/src/trx-server/src/rig_task.rs b/src/trx-server/src/rig_task.rs index 1b889ef..4f99def 100644 --- a/src/trx-server/src/rig_task.rs +++ b/src/trx-server/src/rig_task.rs @@ -368,6 +368,12 @@ async fn process_command( let _ = ctx.state_tx.send(ctx.state.clone()); return snapshot_from(ctx.state); } + RigCommand::SetWsprDecodeEnabled(en) => { + ctx.state.wspr_decode_enabled = en; + info!("WSPR decode {}", if en { "enabled" } else { "disabled" }); + let _ = ctx.state_tx.send(ctx.state.clone()); + return snapshot_from(ctx.state); + } RigCommand::ResetAprsDecoder => { audio::clear_aprs_history(); ctx.state.aprs_decode_reset_seq += 1; @@ -385,6 +391,12 @@ async fn process_command( let _ = ctx.state_tx.send(ctx.state.clone()); return snapshot_from(ctx.state); } + RigCommand::ResetWsprDecoder => { + audio::clear_wspr_history(); + ctx.state.wspr_decode_reset_seq += 1; + let _ = ctx.state_tx.send(ctx.state.clone()); + return snapshot_from(ctx.state); + } _ => {} // fall through to normal rig handler }