[feat](trx-wxsat): add Meteor-M LRPT decoder and Weather Satellites frontend panel

Restructure trx-wxsat into noaa/ (APT) and lrpt/ (Meteor-M LRPT) submodules
with shared crate base. Add QPSK demodulator, CCSDS CADU framer, MCU channel
assembler for LRPT. Wire LRPT through full stack (core types, protocol, server
decoder task, client). Add Weather Satellites sub-tab in Digital Modes with
toggle buttons for NOAA APT and Meteor LRPT, descriptions, and image history.

https://claude.ai/code/session_01JA13DHuzuHUL4nSBBRU83f
Signed-off-by: Claude <noreply@anthropic.com>
This commit is contained in:
Claude
2026-03-28 07:13:05 +00:00
committed by Stan Grams
parent d26ef6ca81
commit 1a3b815ed8
29 changed files with 1526 additions and 158 deletions
+244 -4
View File
@@ -25,21 +25,22 @@ use trx_core::audio::{
parse_vchan_uuid_msg, read_audio_msg, write_audio_msg, write_vchan_audio_frame,
write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE,
AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT2_DECODE, AUDIO_MSG_FT4_DECODE, AUDIO_MSG_FT8_DECODE,
AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_WXSAT_IMAGE,
AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_LRPT_IMAGE,
AUDIO_MSG_WXSAT_IMAGE,
AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED,
AUDIO_MSG_VCHAN_BW, AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE,
AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE,
AUDIO_MSG_WSPR_DECODE,
};
use trx_core::decode::{
AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage,
WxsatImage,
AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, LrptImage, VdesMessage,
WsprMessage, WxsatImage,
};
use trx_core::rig::state::{RigMode, RigState};
use trx_core::vchan::SharedVChanManager;
use trx_cw::CwDecoder;
use trx_ftx::Ft8Decoder;
use trx_wxsat::AptDecoder;
use trx_wxsat::noaa::AptDecoder;
use trx_vdes::VdesDecoder;
use trx_wspr::WsprDecoder;
use uuid::Uuid;
@@ -55,6 +56,9 @@ const CW_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const WXSAT_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const LRPT_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
/// Silence timeout before auto-finalising an LRPT pass (30 s without new MCUs).
const LRPT_PASS_SILENCE_TIMEOUT: Duration = Duration::from_secs(30);
/// Silence timeout before auto-finalising a wxsat pass (30 s without new lines).
const WXSAT_PASS_SILENCE_TIMEOUT: Duration = Duration::from_secs(30);
const FT8_SAMPLE_RATE: u32 = 12_000;
@@ -214,6 +218,7 @@ pub struct DecoderHistories {
pub ft2: Mutex<VecDeque<(Instant, Ft8Message)>>,
pub wspr: Mutex<VecDeque<(Instant, WsprMessage)>>,
pub wxsat: Mutex<VecDeque<(Instant, WxsatImage)>>,
pub lrpt: Mutex<VecDeque<(Instant, LrptImage)>>,
/// Approximate total entry count across all decoders, maintained
/// atomically so `estimated_total_count()` avoids 9 lock acquisitions.
total_count: AtomicUsize,
@@ -232,6 +237,7 @@ impl DecoderHistories {
ft2: Mutex::new(VecDeque::new()),
wspr: Mutex::new(VecDeque::new()),
wxsat: Mutex::new(VecDeque::new()),
lrpt: Mutex::new(VecDeque::new()),
total_count: AtomicUsize::new(0),
})
}
@@ -623,6 +629,52 @@ impl DecoderHistories {
h.iter().map(|(_, img)| img.clone()).collect()
}
pub fn clear_wxsat_history(&self) {
let mut h = self.wxsat.lock().unwrap_or_else(|e| e.into_inner());
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
}
// --- LRPT ---
fn prune_lrpt(history: &mut VecDeque<(Instant, LrptImage)>) {
let cutoff = Instant::now() - LRPT_HISTORY_RETENTION;
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
}
pub fn record_lrpt_image(&self, mut img: LrptImage) {
if img.ts_ms.is_none() {
img.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.lrpt.lock().unwrap_or_else(|e| e.into_inner());
let before = h.len();
h.push_back((Instant::now(), img));
Self::prune_lrpt(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_lrpt_history(&self) -> Vec<LrptImage> {
let mut h = self.lrpt.lock().unwrap_or_else(|e| e.into_inner());
let before = h.len();
Self::prune_lrpt(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, img)| img.clone()).collect()
}
pub fn clear_lrpt_history(&self) {
let mut h = self.lrpt.lock().unwrap_or_else(|e| e.into_inner());
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
}
/// Returns a quick (non-pruning) estimate of the total number of history
/// entries across all decoders, used for pre-allocating the replay blob.
///
@@ -2600,6 +2652,187 @@ async fn finalize_wxsat_pass(
decoder.reset();
}
// ---------------------------------------------------------------------------
// Meteor-M LRPT decoder task
// ---------------------------------------------------------------------------
/// Decode Meteor-M LRPT satellite images from QPSK-demodulated baseband.
///
/// The task is idle until `state.lrpt_decode_enabled` becomes `true`.
/// When disabled (or 30 s of silence elapses with no new MCUs), the
/// accumulated image is saved and broadcast.
pub async fn run_lrpt_decoder(
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
mut state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
histories: Arc<DecoderHistories>,
output_dir: std::path::PathBuf,
) {
use trx_wxsat::lrpt::LrptDecoder;
info!("LRPT decoder started ({}Hz, {} ch)", sample_rate, channels);
let mut decoder = LrptDecoder::new(sample_rate);
let mut last_reset_seq: u64 = 0;
let mut active = state_rx.borrow().lrpt_decode_enabled;
let mut pass_start_ms: i64 = 0;
let mut last_mcu_at = tokio::time::Instant::now();
loop {
if !active {
match state_rx.changed().await {
Ok(()) => {
let state = state_rx.borrow();
active = state.lrpt_decode_enabled;
if active {
decoder.reset();
pass_start_ms = current_timestamp_ms();
last_mcu_at = tokio::time::Instant::now();
pcm_rx = pcm_rx.resubscribe();
}
if state.lrpt_decode_reset_seq != last_reset_seq {
last_reset_seq = state.lrpt_decode_reset_seq;
decoder.reset();
}
}
Err(_) => break,
}
continue;
}
let silence_deadline = last_mcu_at + LRPT_PASS_SILENCE_TIMEOUT;
tokio::select! {
recv = pcm_rx.recv() => {
match recv {
Ok(frame) => {
let reset_seq = state_rx.borrow().lrpt_decode_reset_seq;
if reset_seq != last_reset_seq {
last_reset_seq = reset_seq;
decoder.reset();
}
let mono: Vec<f32> = if channels > 1 {
frame.chunks_exact(channels as usize).map(|ch| ch[0]).collect()
} else {
frame
};
let new_mcus = decoder.process_samples(&mono);
if new_mcus > 0 {
last_mcu_at = tokio::time::Instant::now();
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("LRPT decoder: dropped {} PCM frames", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
changed = state_rx.changed() => {
if changed.is_ok() {
let (new_active, new_reset_seq) = {
let state = state_rx.borrow();
(state.lrpt_decode_enabled, state.lrpt_decode_reset_seq)
};
let was_active = active;
active = new_active;
if new_reset_seq != last_reset_seq {
last_reset_seq = new_reset_seq;
decoder.reset();
}
if was_active && !active {
finalize_lrpt_pass(
&mut decoder,
&output_dir,
&decode_tx,
&histories,
pass_start_ms,
).await;
}
} else {
break;
}
}
_ = tokio::time::sleep_until(silence_deadline), if decoder.mcu_count() > 0 => {
info!(
"LRPT: no new MCUs for {}s — finalising pass ({} MCUs)",
LRPT_PASS_SILENCE_TIMEOUT.as_secs(),
decoder.mcu_count()
);
finalize_lrpt_pass(
&mut decoder,
&output_dir,
&decode_tx,
&histories,
pass_start_ms,
).await;
}
}
}
}
async fn finalize_lrpt_pass(
decoder: &mut trx_wxsat::lrpt::LrptDecoder,
output_dir: &std::path::Path,
decode_tx: &broadcast::Sender<DecodedMessage>,
histories: &Arc<DecoderHistories>,
pass_start_ms: i64,
) {
if decoder.mcu_count() < 2 {
decoder.reset();
return;
}
let pass_end_ms = current_timestamp_ms();
let Some(lrpt_image) = decoder.finalize() else {
decoder.reset();
return;
};
let dt = chrono::Local::now();
let filename = dt.format("%Y-%m-%d_%H-%M-%S-lrpt.png").to_string();
let path = output_dir.join(&filename);
if let Err(e) = std::fs::create_dir_all(output_dir) {
warn!(
"LRPT: failed to create output directory {:?}: {}",
output_dir, e
);
decoder.reset();
return;
}
match std::fs::write(&path, &lrpt_image.png) {
Ok(()) => {
info!(
"LRPT: saved {} ({} MCUs, {} bytes) to {:?}",
filename,
lrpt_image.mcu_count,
lrpt_image.png.len(),
path
);
let img = LrptImage {
rig_id: None,
pass_start_ms,
pass_end_ms,
mcu_count: lrpt_image.mcu_count,
path: path.to_string_lossy().into_owned(),
ts_ms: Some(pass_end_ms),
satellite: lrpt_image.satellite.map(|s| s.to_string()),
channels: lrpt_image.channels.clone(),
};
histories.record_lrpt_image(img.clone());
let _ = decode_tx.send(DecodedMessage::LrptImage(img));
}
Err(e) => {
warn!("LRPT: failed to write {:?}: {}", path, e);
}
}
decoder.reset();
}
// ---------------------------------------------------------------------------
// Virtual-channel audio support
// ---------------------------------------------------------------------------
@@ -3204,6 +3437,11 @@ async fn handle_audio_client(
DecodedMessage::WxsatImage,
AUDIO_MSG_WXSAT_IMAGE
);
push_history!(
histories.snapshot_lrpt_history(),
DecodedMessage::LrptImage,
AUDIO_MSG_LRPT_IMAGE
);
(blob, count)
};
@@ -3288,6 +3526,7 @@ async fn handle_audio_client(
DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE,
DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE,
DecodedMessage::WxsatImage(_) => AUDIO_MSG_WXSAT_IMAGE,
DecodedMessage::LrptImage(_) => AUDIO_MSG_LRPT_IMAGE,
};
if let Ok(json) = serde_json::to_vec(&msg) {
if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await {
@@ -3316,6 +3555,7 @@ async fn handle_audio_client(
DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE,
DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE,
DecodedMessage::WxsatImage(_) => AUDIO_MSG_WXSAT_IMAGE,
DecodedMessage::LrptImage(_) => AUDIO_MSG_LRPT_IMAGE,
};
if let Ok(json) = serde_json::to_vec(&msg) {
if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await {
+19
View File
@@ -812,6 +812,25 @@ fn spawn_rig_audio_stack(
_ = wait_for_shutdown(wxsat_shutdown_rx) => {}
}
}));
// Spawn Meteor-M LRPT decoder task
let lrpt_pcm_rx = pcm_tx.subscribe();
let lrpt_state_rx = state_rx.clone();
let lrpt_decode_tx = decode_tx.clone();
let lrpt_sr = rig_cfg.audio.sample_rate;
let lrpt_ch = rig_cfg.audio.channels;
let lrpt_shutdown_rx = shutdown_rx.clone();
let lrpt_histories = histories.clone();
let lrpt_output_dir = dirs::cache_dir()
.unwrap_or_else(|| std::path::PathBuf::from(".cache"))
.join("trx-rs")
.join("lrpt");
handles.push(tokio::spawn(async move {
tokio::select! {
_ = audio::run_lrpt_decoder(lrpt_sr, lrpt_ch as u16, lrpt_pcm_rx, lrpt_state_rx, lrpt_decode_tx, lrpt_histories, lrpt_output_dir) => {}
_ = wait_for_shutdown(lrpt_shutdown_rx) => {}
}
}));
}
if rig_cfg.audio.tx_enabled {
+24
View File
@@ -545,6 +545,30 @@ async fn process_command(
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::SetWxsatDecodeEnabled(en) => {
ctx.state.wxsat_decode_enabled = en;
info!("wxsat decode {}", if en { "enabled" } else { "disabled" });
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::SetLrptDecodeEnabled(en) => {
ctx.state.lrpt_decode_enabled = en;
info!("LRPT decode {}", if en { "enabled" } else { "disabled" });
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::ResetWxsatDecoder => {
ctx.histories.clear_wxsat_history();
ctx.state.wxsat_decode_reset_seq += 1;
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::ResetLrptDecoder => {
ctx.histories.clear_lrpt_history();
ctx.state.lrpt_decode_reset_seq += 1;
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::SetBandwidth(hz) => {
if let Some(sdr) = ctx.rig.as_sdr() {
if let Err(e) = sdr.set_bandwidth(hz).await {