From 4b40d4481416f381dbdaeeeecff77a42c7b97958 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sat, 28 Mar 2026 07:00:24 +0100 Subject: [PATCH] [feat](trx-noaa): add NOAA APT satellite image decoder New trx-noaa crate: FFT-based Hilbert transform (rustfft) for 2400 Hz AM demodulation, sync A detection via cross-correlation, line assembly at 4160 Hz, and JPEG output via the image crate. - trx-core: NoaaImage type, DecodedMessage::NoaaImage variant, noaa_decode_enabled/noaa_decode_reset_seq on RigState/RigSnapshot, AUDIO_MSG_NOAA_IMAGE = 0x16 - trx-server: DecoderHistories::noaa, run_noaa_decoder task (activates on noaa_decode_enabled, auto-finalises after 30 s silence), saves JPEGs to ~/.cache/trx-rs/noaa/.jpg, forwards events over TCP audio channel and history replay Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- Cargo.lock | 48 ++++ Cargo.toml | 1 + src/decoders/trx-noaa/Cargo.toml | 14 ++ src/decoders/trx-noaa/src/apt.rs | 328 +++++++++++++++++++++++++ src/decoders/trx-noaa/src/image_enc.rs | 43 ++++ src/decoders/trx-noaa/src/lib.rs | 114 +++++++++ src/trx-core/src/audio.rs | 2 + src/trx-core/src/decode.rs | 21 ++ src/trx-core/src/rig/state.rs | 11 + src/trx-server/Cargo.toml | 1 + src/trx-server/src/audio.rs | 255 ++++++++++++++++++- src/trx-server/src/main.rs | 19 ++ 12 files changed, 852 insertions(+), 5 deletions(-) create mode 100644 src/decoders/trx-noaa/Cargo.toml create mode 100644 src/decoders/trx-noaa/src/apt.rs create mode 100644 src/decoders/trx-noaa/src/image_enc.rs create mode 100644 src/decoders/trx-noaa/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index dfc292f..4199697 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -435,6 +435,18 @@ version = "3.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" +[[package]] +name = "bytemuck" +version = "1.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.0" @@ -560,6 +572,12 @@ dependencies = [ "cc", ] +[[package]] +name = "color_quant" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" + [[package]] name = "colorchoice" version = "1.0.4" @@ -1225,6 +1243,19 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "image" +version = "0.24.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5690139d2f55868e080017335e4b94cb7414274c74f1669c84fb5feba2c9f69d" +dependencies = [ + "bytemuck", + "byteorder", + "color_quant", + "jpeg-decoder", + "num-traits", +] + [[package]] name = "indexmap" version = "2.12.1" @@ -1300,6 +1331,12 @@ dependencies = [ "libc", ] +[[package]] +name = "jpeg-decoder" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00810f1d8b74be64b13dbf3db89ac67740615d6c891f0e7b6179326533011a07" + [[package]] name = "js-sys" version = "0.3.85" @@ -2747,6 +2784,16 @@ dependencies = [ "rustfft", ] +[[package]] +name = "trx-noaa" +version = "0.1.0" +dependencies = [ + "image", + "num-complex", + "rustfft", + "trx-core", +] + [[package]] name = "trx-protocol" version = "0.1.0" @@ -2801,6 +2848,7 @@ dependencies = [ "trx-cw", "trx-decode-log", "trx-ftx", + "trx-noaa", "trx-protocol", "trx-reporting", "trx-vdes", diff --git a/Cargo.toml b/Cargo.toml index 1d08fec..effa89c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ [workspace] members = [ "src/decoders/trx-ais", + "src/decoders/trx-noaa", "src/decoders/trx-aprs", "src/decoders/trx-cw", "src/decoders/trx-decode-log", diff --git a/src/decoders/trx-noaa/Cargo.toml b/src/decoders/trx-noaa/Cargo.toml new file mode 100644 index 0000000..93e1ffc --- /dev/null +++ b/src/decoders/trx-noaa/Cargo.toml @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: 2026 Stan Grams +# +# SPDX-License-Identifier: BSD-2-Clause + +[package] +name = "trx-noaa" +version.workspace = true +edition = "2021" + +[dependencies] +trx-core = { path = "../../trx-core" } +rustfft = "6" +num-complex = "0.4" +image = { version = "0.24", default-features = false, features = ["jpeg"] } diff --git a/src/decoders/trx-noaa/src/apt.rs b/src/decoders/trx-noaa/src/apt.rs new file mode 100644 index 0000000..8730e33 --- /dev/null +++ b/src/decoders/trx-noaa/src/apt.rs @@ -0,0 +1,328 @@ +// SPDX-FileCopyrightText: 2026 Stan Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! APT (Automatic Picture Transmission) demodulator and line decoder. +//! +//! NOAA APT signal chain: +//! FM-demodulated audio → 2400 Hz AM subcarrier → envelope → 4160 Hz image +//! +//! Frame layout at 4160 Hz (2080 samples = 0.5 s per line, 2 lines/sec): +//! [SyncA 39][SpaceA 47][ImageA 909][TelA 45][SyncB 39][SpaceB 47][ImageB 909][TelB 45] + +use num_complex::Complex; +use rustfft::FftPlanner; +use std::sync::Arc; + +pub const APT_RATE: u32 = 4160; +pub const LINE_SAMPLES: usize = 2080; + +// Line layout offsets (samples into a line at APT_RATE Hz) +pub const SYNC_A_LEN: usize = 39; +const SPACE_A_LEN: usize = 47; +pub const IMAGE_A_LEN: usize = 909; +const TEL_A_LEN: usize = 45; +const SYNC_B_LEN: usize = 39; +const SPACE_B_LEN: usize = 47; +pub const IMAGE_B_LEN: usize = 909; + +pub const IMAGE_A_OFFSET: usize = SYNC_A_LEN + SPACE_A_LEN; // 86 +pub const IMAGE_B_OFFSET: usize = + IMAGE_A_OFFSET + IMAGE_A_LEN + TEL_A_LEN + SYNC_B_LEN + SPACE_B_LEN; // 1126 + +// FFT block size for Hilbert-based AM demodulation +const BLOCK_SIZE: usize = 4096; + +// Sync detection parameters +const SYNC_THRESHOLD: f32 = 0.15; +const SYNC_SEARCH_LOCKED: usize = 12; // ±samples around expected sync position when locked +const MAX_BAD_SYNC_LINES: u32 = 8; // unlock after this many low-confidence lines + +/// A decoded APT line: raw pixel arrays for both image channels. +#[derive(Clone)] +pub struct RawLine { + pub pixels_a: Box<[u8; IMAGE_A_LEN]>, + pub pixels_b: Box<[u8; IMAGE_B_LEN]>, + pub line_no: u32, +} + +/// Sync A reference pattern at APT_RATE Hz. +/// +/// 1040 Hz square wave (period = 4 samples): alternating pairs hi/lo. +fn sync_a_ref() -> [f32; SYNC_A_LEN] { + let mut p = [0.0f32; SYNC_A_LEN]; + for (i, v) in p.iter_mut().enumerate() { + // 7 cycles of alternating pairs, rest is zero (end-of-sync blank) + *v = if i < 28 && (i % 4) < 2 { 1.0 } else { -1.0 }; + } + p +} + +/// Compute normalised cross-correlation of `buf[offset..]` with the sync A +/// reference pattern. Returns a value approximately in `[-1.0, 1.0]`. +fn sync_score(buf: &[f32], offset: usize) -> f32 { + if offset + SYNC_A_LEN > buf.len() { + return 0.0; + } + let ref_pat = sync_a_ref(); + let window = &buf[offset..offset + SYNC_A_LEN]; + let mean = window.iter().sum::() / SYNC_A_LEN as f32; + let rms = + (window.iter().map(|&x| (x - mean) * (x - mean)).sum::() / SYNC_A_LEN as f32).sqrt(); + if rms < 1e-7 { + return 0.0; + } + window + .iter() + .zip(ref_pat.iter()) + .map(|(&s, &r)| (s - mean) * r) + .sum::() + / (SYNC_A_LEN as f32 * rms) +} + +/// Find the offset in `buf[0..search_len]` with the highest sync A score. +/// Returns `(offset, score)`. +fn find_best_sync(buf: &[f32], search_len: usize) -> (usize, f32) { + let limit = search_len.min(buf.len().saturating_sub(SYNC_A_LEN)); + let mut best_off = 0usize; + let mut best_score = f32::NEG_INFINITY; + for off in 0..=limit { + let s = sync_score(buf, off); + if s > best_score { + best_score = s; + best_off = off; + } + } + (best_off, best_score) +} + +// --------------------------------------------------------------------------- +// AM demodulator (Hilbert-based via rustfft) +// --------------------------------------------------------------------------- + +/// Converts PCM at `sample_rate` to APT envelope samples at `APT_RATE` Hz. +/// +/// Uses an FFT-based Hilbert transform to obtain the analytic signal, then +/// extracts the AM envelope. Processes input in non-overlapping blocks of +/// `BLOCK_SIZE` samples. +pub struct AptDemod { + fft_fwd: Arc>, + fft_inv: Arc>, + /// Lower bin index of the 2400 Hz ± 1040 Hz bandpass filter. + k_lo: usize, + /// Upper bin index of the 2400 Hz ± 1040 Hz bandpass filter. + k_hi: usize, + /// Input sample accumulation buffer. + in_buf: Vec, + /// Fractional position into the next input block for the resampler. + resamp_phase: f64, + /// Input samples consumed per APT_RATE output sample. + resamp_step: f64, + /// Output envelope buffer at APT_RATE Hz. + pub out: Vec, +} + +impl AptDemod { + pub fn new(sample_rate: u32) -> Self { + let mut planner = FftPlanner::new(); + let fft_fwd = planner.plan_fft_forward(BLOCK_SIZE); + let fft_inv = planner.plan_fft_inverse(BLOCK_SIZE); + + let fs = sample_rate as f64; + // Bandpass around 2400 Hz carrier, ±1040 Hz (APT image bandwidth) + let k_lo = ((1360.0 * BLOCK_SIZE as f64 / fs).floor() as usize).max(1); + let k_hi = ((3440.0 * BLOCK_SIZE as f64 / fs).ceil() as usize).min(BLOCK_SIZE / 2); + + Self { + fft_fwd, + fft_inv, + k_lo, + k_hi, + in_buf: Vec::new(), + resamp_phase: 0.0, + resamp_step: sample_rate as f64 / APT_RATE as f64, + out: Vec::new(), + } + } + + /// Push raw PCM samples; envelope output accumulates in `self.out`. + pub fn push(&mut self, samples: &[f32]) { + self.in_buf.extend_from_slice(samples); + while self.in_buf.len() >= BLOCK_SIZE { + // Drain exactly BLOCK_SIZE samples into a stack array + let block: Vec = self.in_buf.drain(..BLOCK_SIZE).collect(); + self.process_block(&block); + } + } + + fn process_block(&mut self, block: &[f32]) { + // Forward FFT + let mut spectrum: Vec> = block.iter().map(|&s| Complex::new(s, 0.0)).collect(); + self.fft_fwd.process(&mut spectrum); + + // Bandpass + analytic signal: + // - Keep positive-freq band [k_lo, k_hi], doubled (for single-sideband power). + // - Zero all other bins (negative freqs and out-of-band positives). + // IFFT of the resulting one-sided spectrum ≈ analytic signal of the + // bandpass-filtered input; its magnitude is the AM envelope. + for (k, bin) in spectrum.iter_mut().enumerate() { + if k >= self.k_lo && k <= self.k_hi { + *bin *= 2.0; + } else { + *bin = Complex::ZERO; + } + } + + // Inverse FFT → complex analytic signal + self.fft_inv.process(&mut spectrum); + let scale = 1.0_f32 / BLOCK_SIZE as f32; + + // Magnitude = AM envelope; resample to APT_RATE via linear interpolation + let n = BLOCK_SIZE as f64; + while self.resamp_phase + 1.0 < n { + let i = self.resamp_phase as usize; + let frac = (self.resamp_phase - i as f64) as f32; + let s0 = spectrum[i].norm() * scale; + let s1 = spectrum[i + 1].norm() * scale; + self.out.push(s0 + frac * (s1 - s0)); + self.resamp_phase += self.resamp_step; + } + self.resamp_phase -= n; + if self.resamp_phase < 0.0 { + self.resamp_phase = 0.0; + } + } + + pub fn reset(&mut self) { + self.in_buf.clear(); + self.out.clear(); + self.resamp_phase = 0.0; + } +} + +// --------------------------------------------------------------------------- +// Sync tracker and line assembler +// --------------------------------------------------------------------------- + +/// Consumes resampled APT envelope samples (at `APT_RATE` Hz), detects +/// sync A markers, and assembles decoded image lines. +pub struct SyncTracker { + buf: Vec, + locked: bool, + bad_sync_count: u32, + line_no: u32, + pub lines: Vec, +} + +impl Default for SyncTracker { + fn default() -> Self { + Self::new() + } +} + +impl SyncTracker { + pub fn new() -> Self { + Self { + buf: Vec::new(), + locked: false, + bad_sync_count: 0, + line_no: 0, + lines: Vec::new(), + } + } + + /// Push envelope samples; fully assembled lines accumulate in `self.lines`. + pub fn push(&mut self, samples: &[f32]) { + self.buf.extend_from_slice(samples); + self.drain(); + } + + fn drain(&mut self) { + loop { + if !self.locked { + // Need 2 × LINE_SAMPLES to have room for a full line after scan + if self.buf.len() < 2 * LINE_SAMPLES { + break; + } + let (pos, score) = find_best_sync(&self.buf, LINE_SAMPLES); + if score >= SYNC_THRESHOLD { + self.buf.drain(0..pos); + self.locked = true; + // Fall through to locked extraction below + } else { + // No convincing sync yet; discard half a line and retry + self.buf.drain(0..LINE_SAMPLES / 2); + continue; + } + } + + // Locked mode: need LINE_SAMPLES + search window to be available + if self.buf.len() < LINE_SAMPLES + SYNC_SEARCH_LOCKED { + break; + } + + // Refine within a small window around the expected line start + let (drift, score) = find_best_sync(&self.buf, SYNC_SEARCH_LOCKED); + if score >= SYNC_THRESHOLD * 0.5 { + // Accept refined position + if drift > 0 { + self.buf.drain(0..drift); + } + self.bad_sync_count = 0; + } else { + self.bad_sync_count += 1; + if self.bad_sync_count >= MAX_BAD_SYNC_LINES { + self.locked = false; + self.bad_sync_count = 0; + // Discard the current suspect region and restart search + self.buf.drain(0..LINE_SAMPLES / 2); + continue; + } + // Keep going at the expected position (don't drift on bad sync) + } + + if self.buf.len() < LINE_SAMPLES { + break; + } + self.extract_line(); + } + } + + fn extract_line(&mut self) { + let samples: Vec = self.buf.drain(0..LINE_SAMPLES).collect(); + + // Per-line normalisation: scale to [0, 255] using the 2nd–98th percentile + // of this line's values to clip noise and hot pixels. + let mut sorted: Vec = samples.clone(); + sorted.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + let p_lo = sorted[(sorted.len() * 2 / 100).max(0)]; + let p_hi = sorted[(sorted.len() * 98 / 100).min(sorted.len() - 1)]; + let range = (p_hi - p_lo).max(1e-6); + + let norm = |v: f32| -> u8 { ((v - p_lo) / range * 255.0).round().clamp(0.0, 255.0) as u8 }; + + let mut pixels_a = Box::new([0u8; IMAGE_A_LEN]); + for (i, p) in pixels_a.iter_mut().enumerate() { + *p = norm(samples[IMAGE_A_OFFSET + i]); + } + let mut pixels_b = Box::new([0u8; IMAGE_B_LEN]); + for (i, p) in pixels_b.iter_mut().enumerate() { + *p = norm(samples[IMAGE_B_OFFSET + i]); + } + + self.lines.push(RawLine { + pixels_a, + pixels_b, + line_no: self.line_no, + }); + self.line_no += 1; + } + + pub fn reset(&mut self) { + self.buf.clear(); + self.locked = false; + self.bad_sync_count = 0; + self.line_no = 0; + self.lines.clear(); + } +} diff --git a/src/decoders/trx-noaa/src/image_enc.rs b/src/decoders/trx-noaa/src/image_enc.rs new file mode 100644 index 0000000..274dbe0 --- /dev/null +++ b/src/decoders/trx-noaa/src/image_enc.rs @@ -0,0 +1,43 @@ +// SPDX-FileCopyrightText: 2026 Stan Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! APT image assembly and JPEG encoding. +//! +//! Standard output layout: channel A (visible / IR-A) on the left half and +//! channel B (IR-B / IR) on the right half, stacked vertically by line number. + +use std::io::Cursor; + +use image::{DynamicImage, GrayImage}; + +use crate::apt::{RawLine, IMAGE_A_LEN, IMAGE_B_LEN}; + +/// Assemble decoded lines into a JPEG image. +/// +/// Returns the JPEG bytes, or `None` if `lines` is empty or encoding fails. +/// Width = `IMAGE_A_LEN + IMAGE_B_LEN` (1818 px), height = number of lines. +pub fn encode_jpeg(lines: &[RawLine], quality: u8) -> Option> { + if lines.is_empty() { + return None; + } + + let width = (IMAGE_A_LEN + IMAGE_B_LEN) as u32; + let height = lines.len() as u32; + let mut pixels: Vec = Vec::with_capacity((width * height) as usize); + + for line in lines { + pixels.extend_from_slice(line.pixels_a.as_ref()); + pixels.extend_from_slice(line.pixels_b.as_ref()); + } + + let gray = GrayImage::from_raw(width, height, pixels)?; + let dynamic = DynamicImage::ImageLuma8(gray); + + let mut cursor = Cursor::new(Vec::new()); + dynamic + .write_to(&mut cursor, image::ImageOutputFormat::Jpeg(quality)) + .ok()?; + + Some(cursor.into_inner()) +} diff --git a/src/decoders/trx-noaa/src/lib.rs b/src/decoders/trx-noaa/src/lib.rs new file mode 100644 index 0000000..ad4e3e7 --- /dev/null +++ b/src/decoders/trx-noaa/src/lib.rs @@ -0,0 +1,114 @@ +// SPDX-FileCopyrightText: 2026 Stan Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! NOAA APT satellite image decoder. +//! +//! Decodes the Automatic Picture Transmission (APT) format broadcast by +//! NOAA-15 (137.620 MHz), NOAA-18 (137.9125 MHz) and NOAA-19 (137.100 MHz). +//! +//! # Signal chain +//! +//! The input is FM-demodulated audio containing a 2400 Hz AM subcarrier. +//! The decoder: +//! 1. Extracts the AM envelope via a FFT-based Hilbert transform (rustfft). +//! 2. Resamples to 4160 Hz (the APT image sample rate). +//! 3. Detects line sync markers (1040 Hz alternating pattern). +//! 4. Assembles image lines (2080 samples each) and extracts both channels. +//! +//! Call [`AptDecoder::process_samples`] with each audio batch, then +//! [`AptDecoder::finalize`] when the pass ends to obtain JPEG bytes. + +pub mod apt; +mod image_enc; + +use apt::{AptDemod, SyncTracker}; + +/// JPEG encoding quality (0–100). +const JPEG_QUALITY: u8 = 85; + +/// Completed APT image returned by [`AptDecoder::finalize`]. +pub struct AptImage { + /// JPEG-encoded image bytes. + pub jpeg: Vec, + /// Number of decoded image lines. + pub line_count: u32, + /// Millisecond timestamp when the first line was decoded. + pub first_line_ms: i64, +} + +/// Top-level NOAA APT decoder. +/// +/// Feed audio samples with [`process_samples`] and call [`finalize`] at +/// pass end to retrieve the assembled JPEG. +pub struct AptDecoder { + demod: AptDemod, + sync: SyncTracker, + first_line_ms: Option, +} + +impl AptDecoder { + pub fn new(sample_rate: u32) -> Self { + Self { + demod: AptDemod::new(sample_rate), + sync: SyncTracker::new(), + first_line_ms: None, + } + } + + /// Process a batch of PCM samples (float32, mono or will be treated as-is). + /// + /// Returns the number of new lines decoded in this batch. + pub fn process_samples(&mut self, samples: &[f32]) -> u32 { + self.demod.push(samples); + + let before = self.sync.lines.len() as u32; + + // Move accumulated envelope output into the sync tracker + if !self.demod.out.is_empty() { + let envelope = std::mem::take(&mut self.demod.out); + self.sync.push(&envelope); + } + + let after = self.sync.lines.len() as u32; + let new_lines = after - before; + + if new_lines > 0 && self.first_line_ms.is_none() { + self.first_line_ms = Some(now_ms()); + } + + new_lines + } + + /// Total number of lines decoded so far. + pub fn line_count(&self) -> u32 { + self.sync.lines.len() as u32 + } + + /// Encode all accumulated lines as a JPEG image and return the result. + /// + /// Returns `None` if no lines have been decoded yet. + /// Does **not** reset the decoder; call [`reset`] afterwards if needed. + pub fn finalize(&self) -> Option { + let jpeg = image_enc::encode_jpeg(&self.sync.lines, JPEG_QUALITY)?; + Some(AptImage { + jpeg, + line_count: self.sync.lines.len() as u32, + first_line_ms: self.first_line_ms.unwrap_or_else(now_ms), + }) + } + + /// Clear all state; ready to decode a fresh pass. + pub fn reset(&mut self) { + self.demod.reset(); + self.sync.reset(); + self.first_line_ms = None; + } +} + +fn now_ms() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) +} diff --git a/src/trx-core/src/audio.rs b/src/trx-core/src/audio.rs index 69a07cf..5c80de8 100644 --- a/src/trx-core/src/audio.rs +++ b/src/trx-core/src/audio.rs @@ -66,6 +66,8 @@ pub const AUDIO_MSG_VCHAN_BW: u8 = 0x13; pub const AUDIO_MSG_FT4_DECODE: u8 = 0x14; /// Server → client: FT2 decoded message (JSON `DecodedMessage::Ft2`). pub const AUDIO_MSG_FT2_DECODE: u8 = 0x15; +/// Server → client: NOAA APT image complete (JSON `DecodedMessage::NoaaImage`). +pub const AUDIO_MSG_NOAA_IMAGE: u8 = 0x16; /// Maximum payload size for normal messages (1 MB). const MAX_PAYLOAD_SIZE: u32 = 1_048_576; diff --git a/src/trx-core/src/decode.rs b/src/trx-core/src/decode.rs index 33b8024..4f34a86 100644 --- a/src/trx-core/src/decode.rs +++ b/src/trx-core/src/decode.rs @@ -28,6 +28,8 @@ pub enum DecodedMessage { Ft2(Ft8Message), #[serde(rename = "wspr")] Wspr(WsprMessage), + #[serde(rename = "noaa_image")] + NoaaImage(NoaaImage), } impl DecodedMessage { @@ -40,6 +42,7 @@ impl DecodedMessage { Self::Cw(m) => m.rig_id = Some(id), Self::Ft8(m) | Self::Ft4(m) | Self::Ft2(m) => m.rig_id = Some(id), Self::Wspr(m) => m.rig_id = Some(id), + Self::NoaaImage(m) => m.rig_id = Some(id), } } @@ -52,6 +55,7 @@ impl DecodedMessage { Self::Cw(m) => m.rig_id.as_deref(), Self::Ft8(m) | Self::Ft4(m) | Self::Ft2(m) => m.rig_id.as_deref(), Self::Wspr(m) => m.rig_id.as_deref(), + Self::NoaaImage(m) => m.rig_id.as_deref(), } } } @@ -203,6 +207,23 @@ pub struct Ft8Message { pub message: String, } +/// A completed NOAA APT satellite image, saved to disk as a JPEG. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NoaaImage { + #[serde(skip_serializing_if = "Option::is_none")] + pub rig_id: Option, + /// UTC timestamp (milliseconds since epoch) of pass start (first decoded line). + pub pass_start_ms: i64, + /// UTC timestamp (milliseconds since epoch) when the image was finalised. + pub pass_end_ms: i64, + /// Number of decoded image lines. + pub line_count: u32, + /// Absolute filesystem path to the saved JPEG file. + pub path: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub ts_ms: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WsprMessage { #[serde(skip_serializing_if = "Option::is_none")] diff --git a/src/trx-core/src/rig/state.rs b/src/trx-core/src/rig/state.rs index b2b0efc..605100e 100644 --- a/src/trx-core/src/rig/state.rs +++ b/src/trx-core/src/rig/state.rs @@ -46,6 +46,8 @@ pub struct RigState { #[serde(default)] pub wspr_decode_enabled: bool, #[serde(default)] + pub noaa_decode_enabled: bool, + #[serde(default)] pub cw_auto: bool, #[serde(default)] pub cw_wpm: u32, @@ -77,6 +79,8 @@ pub struct RigState { pub ft2_decode_reset_seq: u64, #[serde(default, skip_serializing)] pub wspr_decode_reset_seq: u64, + #[serde(default, skip_serializing)] + pub noaa_decode_reset_seq: u64, } /// Mode supported by the rig. @@ -159,6 +163,7 @@ impl RigState { ft4_decode_enabled: false, ft2_decode_enabled: false, wspr_decode_enabled: false, + noaa_decode_enabled: false, cw_auto: true, cw_wpm: 15, cw_tone_hz: 700, @@ -172,6 +177,7 @@ impl RigState { ft4_decode_reset_seq: 0, ft2_decode_reset_seq: 0, wspr_decode_reset_seq: 0, + noaa_decode_reset_seq: 0, } } @@ -231,6 +237,7 @@ impl RigState { ft4_decode_enabled: snapshot.ft4_decode_enabled, ft2_decode_enabled: snapshot.ft2_decode_enabled, wspr_decode_enabled: snapshot.wspr_decode_enabled, + noaa_decode_enabled: snapshot.noaa_decode_enabled, filter: snapshot.filter, spectrum: None, // spectrum flows through /api/spectrum, not persistent state vchan_rds: None, // vchan RDS flows through /api/spectrum, not persistent state @@ -241,6 +248,7 @@ impl RigState { ft4_decode_reset_seq: 0, ft2_decode_reset_seq: 0, wspr_decode_reset_seq: 0, + noaa_decode_reset_seq: 0, } } @@ -278,6 +286,7 @@ impl RigState { ft4_decode_enabled: self.ft4_decode_enabled, ft2_decode_enabled: self.ft2_decode_enabled, wspr_decode_enabled: self.wspr_decode_enabled, + noaa_decode_enabled: self.noaa_decode_enabled, filter: self.filter.clone(), spectrum: self.spectrum.clone(), vchan_rds: self.vchan_rds.clone(), @@ -489,6 +498,8 @@ pub struct RigSnapshot { #[serde(default)] pub wspr_decode_enabled: bool, #[serde(default)] + pub noaa_decode_enabled: bool, + #[serde(default)] pub cw_auto: bool, #[serde(default)] pub cw_wpm: u32, diff --git a/src/trx-server/Cargo.toml b/src/trx-server/Cargo.toml index 9d38e1b..ac7e52c 100644 --- a/src/trx-server/Cargo.toml +++ b/src/trx-server/Cargo.toml @@ -39,5 +39,6 @@ trx-cw = { path = "../decoders/trx-cw" } trx-decode-log = { path = "../decoders/trx-decode-log" } trx-ftx = { path = "../decoders/trx-ftx" } trx-wspr = { path = "../decoders/trx-wspr" } +trx-noaa = { path = "../decoders/trx-noaa" } trx-protocol = { path = "../trx-protocol" } trx-reporting = { path = "../trx-reporting" } \ No newline at end of file diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index c835c3d..10be70d 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -11,6 +11,7 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use bytes::Bytes; +use chrono::Local; use flate2::write::GzEncoder; use flate2::Compression; use num_complex::Complex; @@ -25,18 +26,21 @@ use trx_core::audio::{ parse_vchan_uuid_msg, read_audio_msg, write_audio_msg, write_vchan_audio_frame, write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT2_DECODE, AUDIO_MSG_FT4_DECODE, AUDIO_MSG_FT8_DECODE, - AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, - AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_BW, - AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, - AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, + AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_NOAA_IMAGE, + AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, + AUDIO_MSG_VCHAN_BW, AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, + AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, + AUDIO_MSG_WSPR_DECODE, }; use trx_core::decode::{ - AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage, + AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, NoaaImage, VdesMessage, + WsprMessage, }; use trx_core::rig::state::{RigMode, RigState}; use trx_core::vchan::SharedVChanManager; use trx_cw::CwDecoder; use trx_ftx::Ft8Decoder; +use trx_noaa::AptDecoder; use trx_vdes::VdesDecoder; use trx_wspr::WsprDecoder; use uuid::Uuid; @@ -51,6 +55,9 @@ const VDES_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const CW_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); +const NOAA_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); +/// Silence timeout before auto-finalising a NOAA pass (30 s without new lines). +const NOAA_PASS_SILENCE_TIMEOUT: Duration = Duration::from_secs(30); const FT8_SAMPLE_RATE: u32 = 12_000; const FT2_ASYNC_BUFFER_SAMPLES: usize = 45_000; const FT2_ASYNC_TRIGGER_SAMPLES: usize = 9_000; @@ -207,6 +214,7 @@ pub struct DecoderHistories { pub ft4: Mutex>, pub ft2: Mutex>, pub wspr: Mutex>, + pub noaa: Mutex>, /// Approximate total entry count across all decoders, maintained /// atomically so `estimated_total_count()` avoids 9 lock acquisitions. total_count: AtomicUsize, @@ -224,6 +232,7 @@ impl DecoderHistories { ft4: Mutex::new(VecDeque::new()), ft2: Mutex::new(VecDeque::new()), wspr: Mutex::new(VecDeque::new()), + noaa: Mutex::new(VecDeque::new()), total_count: AtomicUsize::new(0), }) } @@ -583,6 +592,38 @@ impl DecoderHistories { self.adjust_total_count(before, 0); } + // --- NOAA --- + + fn prune_noaa(history: &mut VecDeque<(Instant, NoaaImage)>) { + let cutoff = Instant::now() - NOAA_HISTORY_RETENTION; + while let Some((ts, _)) = history.front() { + if *ts < cutoff { + history.pop_front(); + } else { + break; + } + } + } + + pub fn record_noaa_image(&self, mut img: NoaaImage) { + if img.ts_ms.is_none() { + img.ts_ms = Some(current_timestamp_ms()); + } + let mut h = self.noaa.lock().unwrap_or_else(|e| e.into_inner()); + let before = h.len(); + h.push_back((Instant::now(), img)); + Self::prune_noaa(&mut h); + self.adjust_total_count(before, h.len()); + } + + pub fn snapshot_noaa_history(&self) -> Vec { + let mut h = self.noaa.lock().unwrap_or_else(|e| e.into_inner()); + let before = h.len(); + Self::prune_noaa(&mut h); + self.adjust_total_count(before, h.len()); + h.iter().map(|(_, img)| img.clone()).collect() + } + /// Returns a quick (non-pruning) estimate of the total number of history /// entries across all decoders, used for pre-allocating the replay blob. /// @@ -2353,6 +2394,203 @@ pub async fn run_wspr_decoder( } } +// --------------------------------------------------------------------------- +// NOAA APT decoder task +// --------------------------------------------------------------------------- + +/// Decode NOAA APT satellite images from FM-demodulated audio. +/// +/// The task is idle until `state.noaa_decode_enabled` becomes `true`. +/// When the user disables the decoder (or 30 s of silence elapses with no +/// new decoded lines), the accumulated image is encoded as JPEG and saved to +/// `output_dir/.jpg`. +pub async fn run_noaa_decoder( + sample_rate: u32, + channels: u16, + mut pcm_rx: broadcast::Receiver>, + mut state_rx: watch::Receiver, + decode_tx: broadcast::Sender, + histories: Arc, + output_dir: std::path::PathBuf, +) { + info!("NOAA decoder started ({}Hz, {} ch)", sample_rate, channels); + let mut decoder = AptDecoder::new(sample_rate); + let mut last_reset_seq: u64 = 0; + let mut active = state_rx.borrow().noaa_decode_enabled; + let mut pass_start_ms: i64 = 0; + // Instant of the last time new lines were decoded (for auto-finalise) + let mut last_line_at = tokio::time::Instant::now(); + + if active { + pass_start_ms = current_timestamp_ms(); + pcm_rx = pcm_rx.resubscribe(); + } + + loop { + if !active { + match state_rx.changed().await { + Ok(()) => { + let state = state_rx.borrow(); + active = state.noaa_decode_enabled; + if active { + decoder.reset(); + pass_start_ms = current_timestamp_ms(); + last_line_at = tokio::time::Instant::now(); + pcm_rx = pcm_rx.resubscribe(); + } + if state.noaa_decode_reset_seq != last_reset_seq { + last_reset_seq = state.noaa_decode_reset_seq; + decoder.reset(); + } + } + Err(_) => break, + } + continue; + } + + let silence_deadline = last_line_at + NOAA_PASS_SILENCE_TIMEOUT; + + tokio::select! { + recv = pcm_rx.recv() => { + match recv { + Ok(frame) => { + let reset_seq = state_rx.borrow().noaa_decode_reset_seq; + if reset_seq != last_reset_seq { + last_reset_seq = reset_seq; + decoder.reset(); + pcm_rx = pcm_rx.resubscribe(); + continue; + } + + let mono = downmix_mono(frame, channels); + let new_lines = tokio::task::block_in_place(|| decoder.process_samples(&mono)); + if new_lines > 0 { + last_line_at = tokio::time::Instant::now(); + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("NOAA decoder: dropped {} PCM frames", n); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } + changed = state_rx.changed() => { + match changed { + Ok(()) => { + let state = state_rx.borrow(); + let was_active = active; + active = state.noaa_decode_enabled; + if state.noaa_decode_reset_seq != last_reset_seq { + last_reset_seq = state.noaa_decode_reset_seq; + decoder.reset(); + pass_start_ms = current_timestamp_ms(); + } + if was_active && !active { + // User disabled — finalise whatever we have + finalize_noaa_pass( + &mut decoder, + pass_start_ms, + &output_dir, + &decode_tx, + &histories, + ) + .await; + } else if !was_active && active { + decoder.reset(); + pass_start_ms = current_timestamp_ms(); + last_line_at = tokio::time::Instant::now(); + pcm_rx = pcm_rx.resubscribe(); + } + } + Err(_) => break, + } + } + // Auto-finalise after sustained silence (satellite pass ended) + _ = tokio::time::sleep_until(silence_deadline), if decoder.line_count() > 0 => { + info!( + "NOAA: no new lines for {}s — finalising pass ({} lines)", + NOAA_PASS_SILENCE_TIMEOUT.as_secs(), + decoder.line_count() + ); + finalize_noaa_pass( + &mut decoder, + pass_start_ms, + &output_dir, + &decode_tx, + &histories, + ) + .await; + // Remain active; ready for the next pass + pass_start_ms = current_timestamp_ms(); + last_line_at = tokio::time::Instant::now(); + } + } + } +} + +/// Encode all accumulated lines as JPEG, write to disk, and broadcast the +/// `DecodedMessage::NoaaImage` event. No-ops if fewer than 2 lines decoded. +async fn finalize_noaa_pass( + decoder: &mut AptDecoder, + pass_start_ms: i64, + output_dir: &std::path::Path, + decode_tx: &broadcast::Sender, + histories: &Arc, +) { + if decoder.line_count() < 2 { + decoder.reset(); + return; + } + + let pass_end_ms = current_timestamp_ms(); + + let Some(apt_image) = decoder.finalize() else { + decoder.reset(); + return; + }; + + // Build output path: /.jpg + let dt = chrono::Local::now(); + let filename = dt.format("%Y-%m-%d_%H-%M-%S.jpg").to_string(); + let path = output_dir.join(&filename); + + if let Err(e) = std::fs::create_dir_all(output_dir) { + warn!( + "NOAA: failed to create output directory {:?}: {}", + output_dir, e + ); + decoder.reset(); + return; + } + + match std::fs::write(&path, &apt_image.jpeg) { + Ok(()) => { + info!( + "NOAA: saved {} ({} lines, {} bytes) to {:?}", + filename, + apt_image.line_count, + apt_image.jpeg.len(), + path + ); + let img = NoaaImage { + rig_id: None, + pass_start_ms: apt_image.first_line_ms, + pass_end_ms, + line_count: apt_image.line_count, + path: path.to_string_lossy().into_owned(), + ts_ms: Some(pass_end_ms), + }; + histories.record_noaa_image(img.clone()); + let _ = decode_tx.send(DecodedMessage::NoaaImage(img)); + } + Err(e) => { + warn!("NOAA: failed to write {:?}: {}", path, e); + } + } + + decoder.reset(); +} + // --------------------------------------------------------------------------- // Virtual-channel audio support // --------------------------------------------------------------------------- @@ -2952,6 +3190,11 @@ async fn handle_audio_client( DecodedMessage::Cw, AUDIO_MSG_CW_DECODE ); + push_history!( + histories.snapshot_noaa_history(), + DecodedMessage::NoaaImage, + AUDIO_MSG_NOAA_IMAGE + ); (blob, count) }; @@ -3035,6 +3278,7 @@ async fn handle_audio_client( DecodedMessage::Ft4(_) => AUDIO_MSG_FT4_DECODE, DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE, DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE, + DecodedMessage::NoaaImage(_) => AUDIO_MSG_NOAA_IMAGE, }; if let Ok(json) = serde_json::to_vec(&msg) { if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await { @@ -3062,6 +3306,7 @@ async fn handle_audio_client( DecodedMessage::Ft4(_) => AUDIO_MSG_FT4_DECODE, DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE, DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE, + DecodedMessage::NoaaImage(_) => AUDIO_MSG_NOAA_IMAGE, }; if let Ok(json) = serde_json::to_vec(&msg) { if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await { diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 08229bf..ffde310 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -793,6 +793,25 @@ fn spawn_rig_audio_stack( _ = wait_for_shutdown(wspr_shutdown_rx) => {} } })); + + // Spawn NOAA APT decoder task + let noaa_pcm_rx = pcm_tx.subscribe(); + let noaa_state_rx = state_rx.clone(); + let noaa_decode_tx = decode_tx.clone(); + let noaa_sr = rig_cfg.audio.sample_rate; + let noaa_ch = rig_cfg.audio.channels; + let noaa_shutdown_rx = shutdown_rx.clone(); + let noaa_histories = histories.clone(); + let noaa_output_dir = dirs::cache_dir() + .unwrap_or_else(|| std::path::PathBuf::from(".cache")) + .join("trx-rs") + .join("noaa"); + handles.push(tokio::spawn(async move { + tokio::select! { + _ = audio::run_noaa_decoder(noaa_sr, noaa_ch as u16, noaa_pcm_rx, noaa_state_rx, noaa_decode_tx, noaa_histories, noaa_output_dir) => {} + _ = wait_for_shutdown(noaa_shutdown_rx) => {} + } + })); } if rig_cfg.audio.tx_enabled {