[feat](trx-rs): remove NOAA APT decoder

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-28 23:21:32 +01:00
parent 804b0d8846
commit 0a60684e28
29 changed files with 33 additions and 1457 deletions
+4 -298
View File
@@ -29,11 +29,11 @@ use trx_core::audio::{
AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED,
AUDIO_MSG_VCHAN_BW, AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE,
AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE,
AUDIO_MSG_WSPR_DECODE, AUDIO_MSG_WXSAT_IMAGE,
AUDIO_MSG_WSPR_DECODE,
};
use trx_core::decode::{
AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, LrptImage, VdesMessage,
WsprMessage, WxsatImage,
WsprMessage,
};
use trx_core::rig::state::{RigMode, RigState};
use trx_core::vchan::SharedVChanManager;
@@ -41,7 +41,6 @@ use trx_cw::CwDecoder;
use trx_ftx::Ft8Decoder;
use trx_vdes::VdesDecoder;
use trx_wspr::WsprDecoder;
use trx_wxsat::noaa::AptDecoder;
use uuid::Uuid;
use crate::config::AudioConfig;
@@ -54,12 +53,9 @@ const VDES_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const CW_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const WXSAT_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const LRPT_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
/// Silence timeout before auto-finalising an LRPT pass (30 s without new MCUs).
const LRPT_PASS_SILENCE_TIMEOUT: Duration = Duration::from_secs(30);
/// Silence timeout before auto-finalising a wxsat pass (30 s without new lines).
const WXSAT_PASS_SILENCE_TIMEOUT: Duration = Duration::from_secs(30);
const FT8_SAMPLE_RATE: u32 = 12_000;
const FT2_ASYNC_BUFFER_SAMPLES: usize = 45_000;
const FT2_ASYNC_TRIGGER_SAMPLES: usize = 9_000;
@@ -216,7 +212,6 @@ pub struct DecoderHistories {
pub ft4: Mutex<VecDeque<(Instant, Ft8Message)>>,
pub ft2: Mutex<VecDeque<(Instant, Ft8Message)>>,
pub wspr: Mutex<VecDeque<(Instant, WsprMessage)>>,
pub wxsat: Mutex<VecDeque<(Instant, WxsatImage)>>,
pub lrpt: Mutex<VecDeque<(Instant, LrptImage)>>,
/// Approximate total entry count across all decoders, maintained
/// atomically so `estimated_total_count()` avoids 9 lock acquisitions.
@@ -235,7 +230,6 @@ impl DecoderHistories {
ft4: Mutex::new(VecDeque::new()),
ft2: Mutex::new(VecDeque::new()),
wspr: Mutex::new(VecDeque::new()),
wxsat: Mutex::new(VecDeque::new()),
lrpt: Mutex::new(VecDeque::new()),
total_count: AtomicUsize::new(0),
})
@@ -596,45 +590,6 @@ impl DecoderHistories {
self.adjust_total_count(before, 0);
}
// --- WXSAT ---
fn prune_wxsat(history: &mut VecDeque<(Instant, WxsatImage)>) {
let cutoff = Instant::now() - WXSAT_HISTORY_RETENTION;
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
}
pub fn record_wxsat_image(&self, mut img: WxsatImage) {
if img.ts_ms.is_none() {
img.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.wxsat.lock().unwrap_or_else(|e| e.into_inner());
let before = h.len();
h.push_back((Instant::now(), img));
Self::prune_wxsat(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_wxsat_history(&self) -> Vec<WxsatImage> {
let mut h = self.wxsat.lock().unwrap_or_else(|e| e.into_inner());
let before = h.len();
Self::prune_wxsat(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, img)| img.clone()).collect()
}
pub fn clear_wxsat_history(&self) {
let mut h = self.wxsat.lock().unwrap_or_else(|e| e.into_inner());
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
}
// --- LRPT ---
fn prune_lrpt(history: &mut VecDeque<(Instant, LrptImage)>) {
@@ -2444,250 +2399,6 @@ pub async fn run_wspr_decoder(
}
}
// ---------------------------------------------------------------------------
// Weather satellite APT decoder task
// ---------------------------------------------------------------------------
/// Decode weather satellite APT images from FM-demodulated audio.
///
/// The task is idle until `state.wxsat_decode_enabled` becomes `true`.
/// When the user disables the decoder (or 30 s of silence elapses with no
/// new decoded lines), the accumulated image is encoded as PNG and saved to
/// `output_dir/<YYYY-MM-DD_HH-MM-SS>.png`.
pub async fn run_wxsat_decoder(
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
mut state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
histories: Arc<DecoderHistories>,
output_dir: std::path::PathBuf,
) {
info!("wxsat decoder started ({}Hz, {} ch)", sample_rate, channels);
let mut decoder = AptDecoder::new(sample_rate);
let mut last_reset_seq: u64 = 0;
let mut active = state_rx.borrow().wxsat_decode_enabled;
// Instant of the last time new lines were decoded (for auto-finalise)
let mut last_line_at = tokio::time::Instant::now();
if active {
pcm_rx = pcm_rx.resubscribe();
}
loop {
if !active {
match state_rx.changed().await {
Ok(()) => {
let state = state_rx.borrow();
active = state.wxsat_decode_enabled;
if active {
decoder.reset();
last_line_at = tokio::time::Instant::now();
pcm_rx = pcm_rx.resubscribe();
}
if state.wxsat_decode_reset_seq != last_reset_seq {
last_reset_seq = state.wxsat_decode_reset_seq;
decoder.reset();
}
}
Err(_) => break,
}
continue;
}
let silence_deadline = last_line_at + WXSAT_PASS_SILENCE_TIMEOUT;
tokio::select! {
recv = pcm_rx.recv() => {
match recv {
Ok(frame) => {
let reset_seq = state_rx.borrow().wxsat_decode_reset_seq;
if reset_seq != last_reset_seq {
last_reset_seq = reset_seq;
decoder.reset();
pcm_rx = pcm_rx.resubscribe();
continue;
}
let mono = downmix_mono(frame, channels);
let new_lines = tokio::task::block_in_place(|| decoder.process_samples(&mono));
if new_lines > 0 {
last_line_at = tokio::time::Instant::now();
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("wxsat decoder: dropped {} PCM frames", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
changed = state_rx.changed() => {
match changed {
Ok(()) => {
// Extract fields before any await so the Ref is dropped.
let (new_active, new_reset_seq) = {
let state = state_rx.borrow();
(state.wxsat_decode_enabled, state.wxsat_decode_reset_seq)
};
let was_active = active;
active = new_active;
if new_reset_seq != last_reset_seq {
last_reset_seq = new_reset_seq;
decoder.reset();
}
if was_active && !active {
// User disabled — finalise whatever we have
let (slat, slon) = {
let s = state_rx.borrow();
(s.server_latitude, s.server_longitude)
};
finalize_wxsat_pass(
&mut decoder,
&output_dir,
&decode_tx,
&histories,
slat,
slon,
)
.await;
} else if !was_active && active {
decoder.reset();
last_line_at = tokio::time::Instant::now();
pcm_rx = pcm_rx.resubscribe();
}
}
Err(_) => break,
}
}
// Auto-finalise after sustained silence (satellite pass ended)
_ = tokio::time::sleep_until(silence_deadline), if decoder.line_count() > 0 => {
info!(
"wxsat: no new lines for {}s — finalising pass ({} lines)",
WXSAT_PASS_SILENCE_TIMEOUT.as_secs(),
decoder.line_count()
);
let (slat, slon) = {
let s = state_rx.borrow();
(s.server_latitude, s.server_longitude)
};
finalize_wxsat_pass(
&mut decoder,
&output_dir,
&decode_tx,
&histories,
slat,
slon,
)
.await;
// Remain active; ready for the next pass
last_line_at = tokio::time::Instant::now();
}
}
}
}
/// Encode all accumulated lines as PNG, write to disk, and broadcast the
/// `DecodedMessage::WxsatImage` event. No-ops if fewer than 2 lines decoded.
async fn finalize_wxsat_pass(
decoder: &mut AptDecoder,
output_dir: &std::path::Path,
decode_tx: &broadcast::Sender<DecodedMessage>,
histories: &Arc<DecoderHistories>,
station_lat: Option<f64>,
station_lon: Option<f64>,
) {
if decoder.line_count() < 2 {
decoder.reset();
return;
}
let pass_end_ms = current_timestamp_ms();
let Some(apt_image) = decoder.finalize() else {
decoder.reset();
return;
};
// Build output path: <output_dir>/<YYYY-MM-DD_HH-MM-SS>.png
let dt = chrono::Local::now();
let filename = dt.format("%Y-%m-%d_%H-%M-%S.png").to_string();
let path = output_dir.join(&filename);
if let Err(e) = std::fs::create_dir_all(output_dir) {
warn!(
"wxsat: failed to create output directory {:?}: {}",
output_dir, e
);
decoder.reset();
return;
}
match std::fs::write(&path, &apt_image.png) {
Ok(()) => {
let sat_str = format!("{}", apt_image.satellite);
let ch_a_str = format!("{}", apt_image.sensor_a);
let ch_b_str = format!("{}", apt_image.sensor_b);
info!(
"wxsat: saved {} ({} lines, {} bytes, {}, A={}, B={}) to {:?}",
filename,
apt_image.line_count,
apt_image.png.len(),
sat_str,
ch_a_str,
ch_b_str,
path
);
// Compute geographic bounds from SGP4 propagation
let pass_geo = trx_core::geo::compute_pass_geo(
&sat_str,
apt_image.first_line_ms,
pass_end_ms,
station_lat,
station_lon,
)
.or_else(|| {
// Fallback: use station location if available
match (station_lat, station_lon) {
(Some(lat), Some(lon)) => Some(trx_core::geo::estimate_pass_geo_from_station(
apt_image.first_line_ms,
pass_end_ms,
lat,
lon,
)),
_ => None,
}
});
let (geo_bounds, ground_track) = match pass_geo {
Some(geo) => (Some(geo.bounds), Some(geo.ground_track)),
None => (None, None),
};
let img = WxsatImage {
rig_id: None,
pass_start_ms: apt_image.first_line_ms,
pass_end_ms,
line_count: apt_image.line_count,
path: path.to_string_lossy().into_owned(),
ts_ms: Some(pass_end_ms),
satellite: Some(sat_str.clone()),
channel_a: Some(ch_a_str),
channel_b: Some(ch_b_str),
geo_bounds,
ground_track,
};
if geo_bounds.is_some() {
info!("wxsat: geo-referenced {} image overlay", sat_str);
}
histories.record_wxsat_image(img.clone());
let _ = decode_tx.send(DecodedMessage::WxsatImage(img));
}
Err(e) => {
warn!("wxsat: failed to write {:?}: {}", path, e);
}
}
decoder.reset();
}
// ---------------------------------------------------------------------------
// Meteor-M LRPT decoder task
// ---------------------------------------------------------------------------
@@ -3516,11 +3227,6 @@ async fn handle_audio_client(
DecodedMessage::Cw,
AUDIO_MSG_CW_DECODE
);
push_history!(
histories.snapshot_wxsat_history(),
DecodedMessage::WxsatImage,
AUDIO_MSG_WXSAT_IMAGE
);
push_history!(
histories.snapshot_lrpt_history(),
DecodedMessage::LrptImage,
@@ -3609,7 +3315,7 @@ async fn handle_audio_client(
DecodedMessage::Ft4(_) => AUDIO_MSG_FT4_DECODE,
DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE,
DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE,
DecodedMessage::WxsatImage(_) => AUDIO_MSG_WXSAT_IMAGE,
DecodedMessage::LrptImage(_) => AUDIO_MSG_LRPT_IMAGE,
};
if let Ok(json) = serde_json::to_vec(&msg) {
@@ -3638,7 +3344,7 @@ async fn handle_audio_client(
DecodedMessage::Ft4(_) => AUDIO_MSG_FT4_DECODE,
DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE,
DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE,
DecodedMessage::WxsatImage(_) => AUDIO_MSG_WXSAT_IMAGE,
DecodedMessage::LrptImage(_) => AUDIO_MSG_LRPT_IMAGE,
};
if let Ok(json) = serde_json::to_vec(&msg) {
-19
View File
@@ -794,25 +794,6 @@ fn spawn_rig_audio_stack(
}
}));
// Spawn weather satellite APT decoder task
let wxsat_pcm_rx = pcm_tx.subscribe();
let wxsat_state_rx = state_rx.clone();
let wxsat_decode_tx = decode_tx.clone();
let wxsat_sr = rig_cfg.audio.sample_rate;
let wxsat_ch = rig_cfg.audio.channels;
let wxsat_shutdown_rx = shutdown_rx.clone();
let wxsat_histories = histories.clone();
let wxsat_output_dir = dirs::cache_dir()
.unwrap_or_else(|| std::path::PathBuf::from(".cache"))
.join("trx-rs")
.join("wxsat");
handles.push(tokio::spawn(async move {
tokio::select! {
_ = audio::run_wxsat_decoder(wxsat_sr, wxsat_ch as u16, wxsat_pcm_rx, wxsat_state_rx, wxsat_decode_tx, wxsat_histories, wxsat_output_dir) => {}
_ = wait_for_shutdown(wxsat_shutdown_rx) => {}
}
}));
// Spawn Meteor-M LRPT decoder task
let lrpt_pcm_rx = pcm_tx.subscribe();
let lrpt_state_rx = state_rx.clone();
-12
View File
@@ -545,24 +545,12 @@ async fn process_command(
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::SetWxsatDecodeEnabled(en) => {
ctx.state.wxsat_decode_enabled = en;
info!("wxsat decode {}", if en { "enabled" } else { "disabled" });
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::SetLrptDecodeEnabled(en) => {
ctx.state.lrpt_decode_enabled = en;
info!("LRPT decode {}", if en { "enabled" } else { "disabled" });
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::ResetWxsatDecoder => {
ctx.histories.clear_wxsat_history();
ctx.state.wxsat_decode_reset_seq += 1;
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::ResetLrptDecoder => {
ctx.histories.clear_lrpt_history();
ctx.state.lrpt_decode_reset_seq += 1;