[feat](trx-noaa): add NOAA APT satellite image decoder

New trx-noaa crate: FFT-based Hilbert transform (rustfft) for 2400 Hz
AM demodulation, sync A detection via cross-correlation, line assembly
at 4160 Hz, and JPEG output via the image crate.

- trx-core: NoaaImage type, DecodedMessage::NoaaImage variant,
  noaa_decode_enabled/noaa_decode_reset_seq on RigState/RigSnapshot,
  AUDIO_MSG_NOAA_IMAGE = 0x16
- trx-server: DecoderHistories::noaa, run_noaa_decoder task (activates
  on noaa_decode_enabled, auto-finalises after 30 s silence), saves
  JPEGs to ~/.cache/trx-rs/noaa/<YYYY-MM-DD_HH-MM-SS>.jpg, forwards
  events over TCP audio channel and history replay

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-28 07:00:24 +01:00
parent a8b19227d5
commit 4b40d44814
12 changed files with 852 additions and 5 deletions
Generated
+48
View File
@@ -435,6 +435,18 @@ version = "3.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510"
[[package]]
name = "bytemuck"
version = "1.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec"
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.11.0" version = "1.11.0"
@@ -560,6 +572,12 @@ dependencies = [
"cc", "cc",
] ]
[[package]]
name = "color_quant"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b"
[[package]] [[package]]
name = "colorchoice" name = "colorchoice"
version = "1.0.4" version = "1.0.4"
@@ -1225,6 +1243,19 @@ dependencies = [
"icu_properties", "icu_properties",
] ]
[[package]]
name = "image"
version = "0.24.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5690139d2f55868e080017335e4b94cb7414274c74f1669c84fb5feba2c9f69d"
dependencies = [
"bytemuck",
"byteorder",
"color_quant",
"jpeg-decoder",
"num-traits",
]
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "2.12.1" version = "2.12.1"
@@ -1300,6 +1331,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "jpeg-decoder"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00810f1d8b74be64b13dbf3db89ac67740615d6c891f0e7b6179326533011a07"
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.85" version = "0.3.85"
@@ -2747,6 +2784,16 @@ dependencies = [
"rustfft", "rustfft",
] ]
[[package]]
name = "trx-noaa"
version = "0.1.0"
dependencies = [
"image",
"num-complex",
"rustfft",
"trx-core",
]
[[package]] [[package]]
name = "trx-protocol" name = "trx-protocol"
version = "0.1.0" version = "0.1.0"
@@ -2801,6 +2848,7 @@ dependencies = [
"trx-cw", "trx-cw",
"trx-decode-log", "trx-decode-log",
"trx-ftx", "trx-ftx",
"trx-noaa",
"trx-protocol", "trx-protocol",
"trx-reporting", "trx-reporting",
"trx-vdes", "trx-vdes",
+1
View File
@@ -5,6 +5,7 @@
[workspace] [workspace]
members = [ members = [
"src/decoders/trx-ais", "src/decoders/trx-ais",
"src/decoders/trx-noaa",
"src/decoders/trx-aprs", "src/decoders/trx-aprs",
"src/decoders/trx-cw", "src/decoders/trx-cw",
"src/decoders/trx-decode-log", "src/decoders/trx-decode-log",
+14
View File
@@ -0,0 +1,14 @@
# SPDX-FileCopyrightText: 2026 Stan Grams <sjg@haxx.space>
#
# SPDX-License-Identifier: BSD-2-Clause
[package]
name = "trx-noaa"
version.workspace = true
edition = "2021"
[dependencies]
trx-core = { path = "../../trx-core" }
rustfft = "6"
num-complex = "0.4"
image = { version = "0.24", default-features = false, features = ["jpeg"] }
+328
View File
@@ -0,0 +1,328 @@
// SPDX-FileCopyrightText: 2026 Stan Grams <sjg@haxx.space>
//
// SPDX-License-Identifier: BSD-2-Clause
//! APT (Automatic Picture Transmission) demodulator and line decoder.
//!
//! NOAA APT signal chain:
//! FM-demodulated audio → 2400 Hz AM subcarrier → envelope → 4160 Hz image
//!
//! Frame layout at 4160 Hz (2080 samples = 0.5 s per line, 2 lines/sec):
//! [SyncA 39][SpaceA 47][ImageA 909][TelA 45][SyncB 39][SpaceB 47][ImageB 909][TelB 45]
use num_complex::Complex;
use rustfft::FftPlanner;
use std::sync::Arc;
pub const APT_RATE: u32 = 4160;
pub const LINE_SAMPLES: usize = 2080;
// Line layout offsets (samples into a line at APT_RATE Hz)
pub const SYNC_A_LEN: usize = 39;
const SPACE_A_LEN: usize = 47;
pub const IMAGE_A_LEN: usize = 909;
const TEL_A_LEN: usize = 45;
const SYNC_B_LEN: usize = 39;
const SPACE_B_LEN: usize = 47;
pub const IMAGE_B_LEN: usize = 909;
pub const IMAGE_A_OFFSET: usize = SYNC_A_LEN + SPACE_A_LEN; // 86
pub const IMAGE_B_OFFSET: usize =
IMAGE_A_OFFSET + IMAGE_A_LEN + TEL_A_LEN + SYNC_B_LEN + SPACE_B_LEN; // 1126
// FFT block size for Hilbert-based AM demodulation
const BLOCK_SIZE: usize = 4096;
// Sync detection parameters
const SYNC_THRESHOLD: f32 = 0.15;
const SYNC_SEARCH_LOCKED: usize = 12; // ±samples around expected sync position when locked
const MAX_BAD_SYNC_LINES: u32 = 8; // unlock after this many low-confidence lines
/// A decoded APT line: raw pixel arrays for both image channels.
#[derive(Clone)]
pub struct RawLine {
pub pixels_a: Box<[u8; IMAGE_A_LEN]>,
pub pixels_b: Box<[u8; IMAGE_B_LEN]>,
pub line_no: u32,
}
/// Sync A reference pattern at APT_RATE Hz.
///
/// 1040 Hz square wave (period = 4 samples): alternating pairs hi/lo.
fn sync_a_ref() -> [f32; SYNC_A_LEN] {
let mut p = [0.0f32; SYNC_A_LEN];
for (i, v) in p.iter_mut().enumerate() {
// 7 cycles of alternating pairs, rest is zero (end-of-sync blank)
*v = if i < 28 && (i % 4) < 2 { 1.0 } else { -1.0 };
}
p
}
/// Compute normalised cross-correlation of `buf[offset..]` with the sync A
/// reference pattern. Returns a value approximately in `[-1.0, 1.0]`.
fn sync_score(buf: &[f32], offset: usize) -> f32 {
if offset + SYNC_A_LEN > buf.len() {
return 0.0;
}
let ref_pat = sync_a_ref();
let window = &buf[offset..offset + SYNC_A_LEN];
let mean = window.iter().sum::<f32>() / SYNC_A_LEN as f32;
let rms =
(window.iter().map(|&x| (x - mean) * (x - mean)).sum::<f32>() / SYNC_A_LEN as f32).sqrt();
if rms < 1e-7 {
return 0.0;
}
window
.iter()
.zip(ref_pat.iter())
.map(|(&s, &r)| (s - mean) * r)
.sum::<f32>()
/ (SYNC_A_LEN as f32 * rms)
}
/// Find the offset in `buf[0..search_len]` with the highest sync A score.
/// Returns `(offset, score)`.
fn find_best_sync(buf: &[f32], search_len: usize) -> (usize, f32) {
let limit = search_len.min(buf.len().saturating_sub(SYNC_A_LEN));
let mut best_off = 0usize;
let mut best_score = f32::NEG_INFINITY;
for off in 0..=limit {
let s = sync_score(buf, off);
if s > best_score {
best_score = s;
best_off = off;
}
}
(best_off, best_score)
}
// ---------------------------------------------------------------------------
// AM demodulator (Hilbert-based via rustfft)
// ---------------------------------------------------------------------------
/// Converts PCM at `sample_rate` to APT envelope samples at `APT_RATE` Hz.
///
/// Uses an FFT-based Hilbert transform to obtain the analytic signal, then
/// extracts the AM envelope. Processes input in non-overlapping blocks of
/// `BLOCK_SIZE` samples.
pub struct AptDemod {
fft_fwd: Arc<dyn rustfft::Fft<f32>>,
fft_inv: Arc<dyn rustfft::Fft<f32>>,
/// Lower bin index of the 2400 Hz ± 1040 Hz bandpass filter.
k_lo: usize,
/// Upper bin index of the 2400 Hz ± 1040 Hz bandpass filter.
k_hi: usize,
/// Input sample accumulation buffer.
in_buf: Vec<f32>,
/// Fractional position into the next input block for the resampler.
resamp_phase: f64,
/// Input samples consumed per APT_RATE output sample.
resamp_step: f64,
/// Output envelope buffer at APT_RATE Hz.
pub out: Vec<f32>,
}
impl AptDemod {
pub fn new(sample_rate: u32) -> Self {
let mut planner = FftPlanner::new();
let fft_fwd = planner.plan_fft_forward(BLOCK_SIZE);
let fft_inv = planner.plan_fft_inverse(BLOCK_SIZE);
let fs = sample_rate as f64;
// Bandpass around 2400 Hz carrier, ±1040 Hz (APT image bandwidth)
let k_lo = ((1360.0 * BLOCK_SIZE as f64 / fs).floor() as usize).max(1);
let k_hi = ((3440.0 * BLOCK_SIZE as f64 / fs).ceil() as usize).min(BLOCK_SIZE / 2);
Self {
fft_fwd,
fft_inv,
k_lo,
k_hi,
in_buf: Vec::new(),
resamp_phase: 0.0,
resamp_step: sample_rate as f64 / APT_RATE as f64,
out: Vec::new(),
}
}
/// Push raw PCM samples; envelope output accumulates in `self.out`.
pub fn push(&mut self, samples: &[f32]) {
self.in_buf.extend_from_slice(samples);
while self.in_buf.len() >= BLOCK_SIZE {
// Drain exactly BLOCK_SIZE samples into a stack array
let block: Vec<f32> = self.in_buf.drain(..BLOCK_SIZE).collect();
self.process_block(&block);
}
}
fn process_block(&mut self, block: &[f32]) {
// Forward FFT
let mut spectrum: Vec<Complex<f32>> = block.iter().map(|&s| Complex::new(s, 0.0)).collect();
self.fft_fwd.process(&mut spectrum);
// Bandpass + analytic signal:
// - Keep positive-freq band [k_lo, k_hi], doubled (for single-sideband power).
// - Zero all other bins (negative freqs and out-of-band positives).
// IFFT of the resulting one-sided spectrum ≈ analytic signal of the
// bandpass-filtered input; its magnitude is the AM envelope.
for (k, bin) in spectrum.iter_mut().enumerate() {
if k >= self.k_lo && k <= self.k_hi {
*bin *= 2.0;
} else {
*bin = Complex::ZERO;
}
}
// Inverse FFT → complex analytic signal
self.fft_inv.process(&mut spectrum);
let scale = 1.0_f32 / BLOCK_SIZE as f32;
// Magnitude = AM envelope; resample to APT_RATE via linear interpolation
let n = BLOCK_SIZE as f64;
while self.resamp_phase + 1.0 < n {
let i = self.resamp_phase as usize;
let frac = (self.resamp_phase - i as f64) as f32;
let s0 = spectrum[i].norm() * scale;
let s1 = spectrum[i + 1].norm() * scale;
self.out.push(s0 + frac * (s1 - s0));
self.resamp_phase += self.resamp_step;
}
self.resamp_phase -= n;
if self.resamp_phase < 0.0 {
self.resamp_phase = 0.0;
}
}
pub fn reset(&mut self) {
self.in_buf.clear();
self.out.clear();
self.resamp_phase = 0.0;
}
}
// ---------------------------------------------------------------------------
// Sync tracker and line assembler
// ---------------------------------------------------------------------------
/// Consumes resampled APT envelope samples (at `APT_RATE` Hz), detects
/// sync A markers, and assembles decoded image lines.
pub struct SyncTracker {
buf: Vec<f32>,
locked: bool,
bad_sync_count: u32,
line_no: u32,
pub lines: Vec<RawLine>,
}
impl Default for SyncTracker {
fn default() -> Self {
Self::new()
}
}
impl SyncTracker {
pub fn new() -> Self {
Self {
buf: Vec::new(),
locked: false,
bad_sync_count: 0,
line_no: 0,
lines: Vec::new(),
}
}
/// Push envelope samples; fully assembled lines accumulate in `self.lines`.
pub fn push(&mut self, samples: &[f32]) {
self.buf.extend_from_slice(samples);
self.drain();
}
fn drain(&mut self) {
loop {
if !self.locked {
// Need 2 × LINE_SAMPLES to have room for a full line after scan
if self.buf.len() < 2 * LINE_SAMPLES {
break;
}
let (pos, score) = find_best_sync(&self.buf, LINE_SAMPLES);
if score >= SYNC_THRESHOLD {
self.buf.drain(0..pos);
self.locked = true;
// Fall through to locked extraction below
} else {
// No convincing sync yet; discard half a line and retry
self.buf.drain(0..LINE_SAMPLES / 2);
continue;
}
}
// Locked mode: need LINE_SAMPLES + search window to be available
if self.buf.len() < LINE_SAMPLES + SYNC_SEARCH_LOCKED {
break;
}
// Refine within a small window around the expected line start
let (drift, score) = find_best_sync(&self.buf, SYNC_SEARCH_LOCKED);
if score >= SYNC_THRESHOLD * 0.5 {
// Accept refined position
if drift > 0 {
self.buf.drain(0..drift);
}
self.bad_sync_count = 0;
} else {
self.bad_sync_count += 1;
if self.bad_sync_count >= MAX_BAD_SYNC_LINES {
self.locked = false;
self.bad_sync_count = 0;
// Discard the current suspect region and restart search
self.buf.drain(0..LINE_SAMPLES / 2);
continue;
}
// Keep going at the expected position (don't drift on bad sync)
}
if self.buf.len() < LINE_SAMPLES {
break;
}
self.extract_line();
}
}
fn extract_line(&mut self) {
let samples: Vec<f32> = self.buf.drain(0..LINE_SAMPLES).collect();
// Per-line normalisation: scale to [0, 255] using the 2nd98th percentile
// of this line's values to clip noise and hot pixels.
let mut sorted: Vec<f32> = samples.clone();
sorted.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let p_lo = sorted[(sorted.len() * 2 / 100).max(0)];
let p_hi = sorted[(sorted.len() * 98 / 100).min(sorted.len() - 1)];
let range = (p_hi - p_lo).max(1e-6);
let norm = |v: f32| -> u8 { ((v - p_lo) / range * 255.0).round().clamp(0.0, 255.0) as u8 };
let mut pixels_a = Box::new([0u8; IMAGE_A_LEN]);
for (i, p) in pixels_a.iter_mut().enumerate() {
*p = norm(samples[IMAGE_A_OFFSET + i]);
}
let mut pixels_b = Box::new([0u8; IMAGE_B_LEN]);
for (i, p) in pixels_b.iter_mut().enumerate() {
*p = norm(samples[IMAGE_B_OFFSET + i]);
}
self.lines.push(RawLine {
pixels_a,
pixels_b,
line_no: self.line_no,
});
self.line_no += 1;
}
pub fn reset(&mut self) {
self.buf.clear();
self.locked = false;
self.bad_sync_count = 0;
self.line_no = 0;
self.lines.clear();
}
}
+43
View File
@@ -0,0 +1,43 @@
// SPDX-FileCopyrightText: 2026 Stan Grams <sjg@haxx.space>
//
// SPDX-License-Identifier: BSD-2-Clause
//! APT image assembly and JPEG encoding.
//!
//! Standard output layout: channel A (visible / IR-A) on the left half and
//! channel B (IR-B / IR) on the right half, stacked vertically by line number.
use std::io::Cursor;
use image::{DynamicImage, GrayImage};
use crate::apt::{RawLine, IMAGE_A_LEN, IMAGE_B_LEN};
/// Assemble decoded lines into a JPEG image.
///
/// Returns the JPEG bytes, or `None` if `lines` is empty or encoding fails.
/// Width = `IMAGE_A_LEN + IMAGE_B_LEN` (1818 px), height = number of lines.
pub fn encode_jpeg(lines: &[RawLine], quality: u8) -> Option<Vec<u8>> {
if lines.is_empty() {
return None;
}
let width = (IMAGE_A_LEN + IMAGE_B_LEN) as u32;
let height = lines.len() as u32;
let mut pixels: Vec<u8> = Vec::with_capacity((width * height) as usize);
for line in lines {
pixels.extend_from_slice(line.pixels_a.as_ref());
pixels.extend_from_slice(line.pixels_b.as_ref());
}
let gray = GrayImage::from_raw(width, height, pixels)?;
let dynamic = DynamicImage::ImageLuma8(gray);
let mut cursor = Cursor::new(Vec::new());
dynamic
.write_to(&mut cursor, image::ImageOutputFormat::Jpeg(quality))
.ok()?;
Some(cursor.into_inner())
}
+114
View File
@@ -0,0 +1,114 @@
// SPDX-FileCopyrightText: 2026 Stan Grams <sjg@haxx.space>
//
// SPDX-License-Identifier: BSD-2-Clause
//! NOAA APT satellite image decoder.
//!
//! Decodes the Automatic Picture Transmission (APT) format broadcast by
//! NOAA-15 (137.620 MHz), NOAA-18 (137.9125 MHz) and NOAA-19 (137.100 MHz).
//!
//! # Signal chain
//!
//! The input is FM-demodulated audio containing a 2400 Hz AM subcarrier.
//! The decoder:
//! 1. Extracts the AM envelope via a FFT-based Hilbert transform (rustfft).
//! 2. Resamples to 4160 Hz (the APT image sample rate).
//! 3. Detects line sync markers (1040 Hz alternating pattern).
//! 4. Assembles image lines (2080 samples each) and extracts both channels.
//!
//! Call [`AptDecoder::process_samples`] with each audio batch, then
//! [`AptDecoder::finalize`] when the pass ends to obtain JPEG bytes.
pub mod apt;
mod image_enc;
use apt::{AptDemod, SyncTracker};
/// JPEG encoding quality (0100).
const JPEG_QUALITY: u8 = 85;
/// Completed APT image returned by [`AptDecoder::finalize`].
pub struct AptImage {
/// JPEG-encoded image bytes.
pub jpeg: Vec<u8>,
/// Number of decoded image lines.
pub line_count: u32,
/// Millisecond timestamp when the first line was decoded.
pub first_line_ms: i64,
}
/// Top-level NOAA APT decoder.
///
/// Feed audio samples with [`process_samples`] and call [`finalize`] at
/// pass end to retrieve the assembled JPEG.
pub struct AptDecoder {
demod: AptDemod,
sync: SyncTracker,
first_line_ms: Option<i64>,
}
impl AptDecoder {
pub fn new(sample_rate: u32) -> Self {
Self {
demod: AptDemod::new(sample_rate),
sync: SyncTracker::new(),
first_line_ms: None,
}
}
/// Process a batch of PCM samples (float32, mono or will be treated as-is).
///
/// Returns the number of new lines decoded in this batch.
pub fn process_samples(&mut self, samples: &[f32]) -> u32 {
self.demod.push(samples);
let before = self.sync.lines.len() as u32;
// Move accumulated envelope output into the sync tracker
if !self.demod.out.is_empty() {
let envelope = std::mem::take(&mut self.demod.out);
self.sync.push(&envelope);
}
let after = self.sync.lines.len() as u32;
let new_lines = after - before;
if new_lines > 0 && self.first_line_ms.is_none() {
self.first_line_ms = Some(now_ms());
}
new_lines
}
/// Total number of lines decoded so far.
pub fn line_count(&self) -> u32 {
self.sync.lines.len() as u32
}
/// Encode all accumulated lines as a JPEG image and return the result.
///
/// Returns `None` if no lines have been decoded yet.
/// Does **not** reset the decoder; call [`reset`] afterwards if needed.
pub fn finalize(&self) -> Option<AptImage> {
let jpeg = image_enc::encode_jpeg(&self.sync.lines, JPEG_QUALITY)?;
Some(AptImage {
jpeg,
line_count: self.sync.lines.len() as u32,
first_line_ms: self.first_line_ms.unwrap_or_else(now_ms),
})
}
/// Clear all state; ready to decode a fresh pass.
pub fn reset(&mut self) {
self.demod.reset();
self.sync.reset();
self.first_line_ms = None;
}
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
+2
View File
@@ -66,6 +66,8 @@ pub const AUDIO_MSG_VCHAN_BW: u8 = 0x13;
pub const AUDIO_MSG_FT4_DECODE: u8 = 0x14; pub const AUDIO_MSG_FT4_DECODE: u8 = 0x14;
/// Server → client: FT2 decoded message (JSON `DecodedMessage::Ft2`). /// Server → client: FT2 decoded message (JSON `DecodedMessage::Ft2`).
pub const AUDIO_MSG_FT2_DECODE: u8 = 0x15; pub const AUDIO_MSG_FT2_DECODE: u8 = 0x15;
/// Server → client: NOAA APT image complete (JSON `DecodedMessage::NoaaImage`).
pub const AUDIO_MSG_NOAA_IMAGE: u8 = 0x16;
/// Maximum payload size for normal messages (1 MB). /// Maximum payload size for normal messages (1 MB).
const MAX_PAYLOAD_SIZE: u32 = 1_048_576; const MAX_PAYLOAD_SIZE: u32 = 1_048_576;
+21
View File
@@ -28,6 +28,8 @@ pub enum DecodedMessage {
Ft2(Ft8Message), Ft2(Ft8Message),
#[serde(rename = "wspr")] #[serde(rename = "wspr")]
Wspr(WsprMessage), Wspr(WsprMessage),
#[serde(rename = "noaa_image")]
NoaaImage(NoaaImage),
} }
impl DecodedMessage { impl DecodedMessage {
@@ -40,6 +42,7 @@ impl DecodedMessage {
Self::Cw(m) => m.rig_id = Some(id), Self::Cw(m) => m.rig_id = Some(id),
Self::Ft8(m) | Self::Ft4(m) | Self::Ft2(m) => m.rig_id = Some(id), Self::Ft8(m) | Self::Ft4(m) | Self::Ft2(m) => m.rig_id = Some(id),
Self::Wspr(m) => m.rig_id = Some(id), Self::Wspr(m) => m.rig_id = Some(id),
Self::NoaaImage(m) => m.rig_id = Some(id),
} }
} }
@@ -52,6 +55,7 @@ impl DecodedMessage {
Self::Cw(m) => m.rig_id.as_deref(), Self::Cw(m) => m.rig_id.as_deref(),
Self::Ft8(m) | Self::Ft4(m) | Self::Ft2(m) => m.rig_id.as_deref(), Self::Ft8(m) | Self::Ft4(m) | Self::Ft2(m) => m.rig_id.as_deref(),
Self::Wspr(m) => m.rig_id.as_deref(), Self::Wspr(m) => m.rig_id.as_deref(),
Self::NoaaImage(m) => m.rig_id.as_deref(),
} }
} }
} }
@@ -203,6 +207,23 @@ pub struct Ft8Message {
pub message: String, pub message: String,
} }
/// A completed NOAA APT satellite image, saved to disk as a JPEG.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NoaaImage {
#[serde(skip_serializing_if = "Option::is_none")]
pub rig_id: Option<String>,
/// UTC timestamp (milliseconds since epoch) of pass start (first decoded line).
pub pass_start_ms: i64,
/// UTC timestamp (milliseconds since epoch) when the image was finalised.
pub pass_end_ms: i64,
/// Number of decoded image lines.
pub line_count: u32,
/// Absolute filesystem path to the saved JPEG file.
pub path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub ts_ms: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WsprMessage { pub struct WsprMessage {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
+11
View File
@@ -46,6 +46,8 @@ pub struct RigState {
#[serde(default)] #[serde(default)]
pub wspr_decode_enabled: bool, pub wspr_decode_enabled: bool,
#[serde(default)] #[serde(default)]
pub noaa_decode_enabled: bool,
#[serde(default)]
pub cw_auto: bool, pub cw_auto: bool,
#[serde(default)] #[serde(default)]
pub cw_wpm: u32, pub cw_wpm: u32,
@@ -77,6 +79,8 @@ pub struct RigState {
pub ft2_decode_reset_seq: u64, pub ft2_decode_reset_seq: u64,
#[serde(default, skip_serializing)] #[serde(default, skip_serializing)]
pub wspr_decode_reset_seq: u64, pub wspr_decode_reset_seq: u64,
#[serde(default, skip_serializing)]
pub noaa_decode_reset_seq: u64,
} }
/// Mode supported by the rig. /// Mode supported by the rig.
@@ -159,6 +163,7 @@ impl RigState {
ft4_decode_enabled: false, ft4_decode_enabled: false,
ft2_decode_enabled: false, ft2_decode_enabled: false,
wspr_decode_enabled: false, wspr_decode_enabled: false,
noaa_decode_enabled: false,
cw_auto: true, cw_auto: true,
cw_wpm: 15, cw_wpm: 15,
cw_tone_hz: 700, cw_tone_hz: 700,
@@ -172,6 +177,7 @@ impl RigState {
ft4_decode_reset_seq: 0, ft4_decode_reset_seq: 0,
ft2_decode_reset_seq: 0, ft2_decode_reset_seq: 0,
wspr_decode_reset_seq: 0, wspr_decode_reset_seq: 0,
noaa_decode_reset_seq: 0,
} }
} }
@@ -231,6 +237,7 @@ impl RigState {
ft4_decode_enabled: snapshot.ft4_decode_enabled, ft4_decode_enabled: snapshot.ft4_decode_enabled,
ft2_decode_enabled: snapshot.ft2_decode_enabled, ft2_decode_enabled: snapshot.ft2_decode_enabled,
wspr_decode_enabled: snapshot.wspr_decode_enabled, wspr_decode_enabled: snapshot.wspr_decode_enabled,
noaa_decode_enabled: snapshot.noaa_decode_enabled,
filter: snapshot.filter, filter: snapshot.filter,
spectrum: None, // spectrum flows through /api/spectrum, not persistent state spectrum: None, // spectrum flows through /api/spectrum, not persistent state
vchan_rds: None, // vchan RDS flows through /api/spectrum, not persistent state vchan_rds: None, // vchan RDS flows through /api/spectrum, not persistent state
@@ -241,6 +248,7 @@ impl RigState {
ft4_decode_reset_seq: 0, ft4_decode_reset_seq: 0,
ft2_decode_reset_seq: 0, ft2_decode_reset_seq: 0,
wspr_decode_reset_seq: 0, wspr_decode_reset_seq: 0,
noaa_decode_reset_seq: 0,
} }
} }
@@ -278,6 +286,7 @@ impl RigState {
ft4_decode_enabled: self.ft4_decode_enabled, ft4_decode_enabled: self.ft4_decode_enabled,
ft2_decode_enabled: self.ft2_decode_enabled, ft2_decode_enabled: self.ft2_decode_enabled,
wspr_decode_enabled: self.wspr_decode_enabled, wspr_decode_enabled: self.wspr_decode_enabled,
noaa_decode_enabled: self.noaa_decode_enabled,
filter: self.filter.clone(), filter: self.filter.clone(),
spectrum: self.spectrum.clone(), spectrum: self.spectrum.clone(),
vchan_rds: self.vchan_rds.clone(), vchan_rds: self.vchan_rds.clone(),
@@ -489,6 +498,8 @@ pub struct RigSnapshot {
#[serde(default)] #[serde(default)]
pub wspr_decode_enabled: bool, pub wspr_decode_enabled: bool,
#[serde(default)] #[serde(default)]
pub noaa_decode_enabled: bool,
#[serde(default)]
pub cw_auto: bool, pub cw_auto: bool,
#[serde(default)] #[serde(default)]
pub cw_wpm: u32, pub cw_wpm: u32,
+1
View File
@@ -39,5 +39,6 @@ trx-cw = { path = "../decoders/trx-cw" }
trx-decode-log = { path = "../decoders/trx-decode-log" } trx-decode-log = { path = "../decoders/trx-decode-log" }
trx-ftx = { path = "../decoders/trx-ftx" } trx-ftx = { path = "../decoders/trx-ftx" }
trx-wspr = { path = "../decoders/trx-wspr" } trx-wspr = { path = "../decoders/trx-wspr" }
trx-noaa = { path = "../decoders/trx-noaa" }
trx-protocol = { path = "../trx-protocol" } trx-protocol = { path = "../trx-protocol" }
trx-reporting = { path = "../trx-reporting" } trx-reporting = { path = "../trx-reporting" }
+250 -5
View File
@@ -11,6 +11,7 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use bytes::Bytes; use bytes::Bytes;
use chrono::Local;
use flate2::write::GzEncoder; use flate2::write::GzEncoder;
use flate2::Compression; use flate2::Compression;
use num_complex::Complex; use num_complex::Complex;
@@ -25,18 +26,21 @@ use trx_core::audio::{
parse_vchan_uuid_msg, read_audio_msg, write_audio_msg, write_vchan_audio_frame, parse_vchan_uuid_msg, read_audio_msg, write_audio_msg, write_vchan_audio_frame,
write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE,
AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT2_DECODE, AUDIO_MSG_FT4_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT2_DECODE, AUDIO_MSG_FT4_DECODE, AUDIO_MSG_FT8_DECODE,
AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_NOAA_IMAGE,
AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_BW, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED,
AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_BW, AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE,
AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE,
AUDIO_MSG_WSPR_DECODE,
}; };
use trx_core::decode::{ use trx_core::decode::{
AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage, AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, NoaaImage, VdesMessage,
WsprMessage,
}; };
use trx_core::rig::state::{RigMode, RigState}; use trx_core::rig::state::{RigMode, RigState};
use trx_core::vchan::SharedVChanManager; use trx_core::vchan::SharedVChanManager;
use trx_cw::CwDecoder; use trx_cw::CwDecoder;
use trx_ftx::Ft8Decoder; use trx_ftx::Ft8Decoder;
use trx_noaa::AptDecoder;
use trx_vdes::VdesDecoder; use trx_vdes::VdesDecoder;
use trx_wspr::WsprDecoder; use trx_wspr::WsprDecoder;
use uuid::Uuid; use uuid::Uuid;
@@ -51,6 +55,9 @@ const VDES_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const CW_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const CW_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
const NOAA_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
/// Silence timeout before auto-finalising a NOAA pass (30 s without new lines).
const NOAA_PASS_SILENCE_TIMEOUT: Duration = Duration::from_secs(30);
const FT8_SAMPLE_RATE: u32 = 12_000; const FT8_SAMPLE_RATE: u32 = 12_000;
const FT2_ASYNC_BUFFER_SAMPLES: usize = 45_000; const FT2_ASYNC_BUFFER_SAMPLES: usize = 45_000;
const FT2_ASYNC_TRIGGER_SAMPLES: usize = 9_000; const FT2_ASYNC_TRIGGER_SAMPLES: usize = 9_000;
@@ -207,6 +214,7 @@ pub struct DecoderHistories {
pub ft4: Mutex<VecDeque<(Instant, Ft8Message)>>, pub ft4: Mutex<VecDeque<(Instant, Ft8Message)>>,
pub ft2: Mutex<VecDeque<(Instant, Ft8Message)>>, pub ft2: Mutex<VecDeque<(Instant, Ft8Message)>>,
pub wspr: Mutex<VecDeque<(Instant, WsprMessage)>>, pub wspr: Mutex<VecDeque<(Instant, WsprMessage)>>,
pub noaa: Mutex<VecDeque<(Instant, NoaaImage)>>,
/// Approximate total entry count across all decoders, maintained /// Approximate total entry count across all decoders, maintained
/// atomically so `estimated_total_count()` avoids 9 lock acquisitions. /// atomically so `estimated_total_count()` avoids 9 lock acquisitions.
total_count: AtomicUsize, total_count: AtomicUsize,
@@ -224,6 +232,7 @@ impl DecoderHistories {
ft4: Mutex::new(VecDeque::new()), ft4: Mutex::new(VecDeque::new()),
ft2: Mutex::new(VecDeque::new()), ft2: Mutex::new(VecDeque::new()),
wspr: Mutex::new(VecDeque::new()), wspr: Mutex::new(VecDeque::new()),
noaa: Mutex::new(VecDeque::new()),
total_count: AtomicUsize::new(0), total_count: AtomicUsize::new(0),
}) })
} }
@@ -583,6 +592,38 @@ impl DecoderHistories {
self.adjust_total_count(before, 0); self.adjust_total_count(before, 0);
} }
// --- NOAA ---
fn prune_noaa(history: &mut VecDeque<(Instant, NoaaImage)>) {
let cutoff = Instant::now() - NOAA_HISTORY_RETENTION;
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
}
pub fn record_noaa_image(&self, mut img: NoaaImage) {
if img.ts_ms.is_none() {
img.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.noaa.lock().unwrap_or_else(|e| e.into_inner());
let before = h.len();
h.push_back((Instant::now(), img));
Self::prune_noaa(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_noaa_history(&self) -> Vec<NoaaImage> {
let mut h = self.noaa.lock().unwrap_or_else(|e| e.into_inner());
let before = h.len();
Self::prune_noaa(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, img)| img.clone()).collect()
}
/// Returns a quick (non-pruning) estimate of the total number of history /// Returns a quick (non-pruning) estimate of the total number of history
/// entries across all decoders, used for pre-allocating the replay blob. /// entries across all decoders, used for pre-allocating the replay blob.
/// ///
@@ -2353,6 +2394,203 @@ pub async fn run_wspr_decoder(
} }
} }
// ---------------------------------------------------------------------------
// NOAA APT decoder task
// ---------------------------------------------------------------------------
/// Decode NOAA APT satellite images from FM-demodulated audio.
///
/// The task is idle until `state.noaa_decode_enabled` becomes `true`.
/// When the user disables the decoder (or 30 s of silence elapses with no
/// new decoded lines), the accumulated image is encoded as JPEG and saved to
/// `output_dir/<YYYY-MM-DD_HH-MM-SS>.jpg`.
pub async fn run_noaa_decoder(
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
mut state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
histories: Arc<DecoderHistories>,
output_dir: std::path::PathBuf,
) {
info!("NOAA decoder started ({}Hz, {} ch)", sample_rate, channels);
let mut decoder = AptDecoder::new(sample_rate);
let mut last_reset_seq: u64 = 0;
let mut active = state_rx.borrow().noaa_decode_enabled;
let mut pass_start_ms: i64 = 0;
// Instant of the last time new lines were decoded (for auto-finalise)
let mut last_line_at = tokio::time::Instant::now();
if active {
pass_start_ms = current_timestamp_ms();
pcm_rx = pcm_rx.resubscribe();
}
loop {
if !active {
match state_rx.changed().await {
Ok(()) => {
let state = state_rx.borrow();
active = state.noaa_decode_enabled;
if active {
decoder.reset();
pass_start_ms = current_timestamp_ms();
last_line_at = tokio::time::Instant::now();
pcm_rx = pcm_rx.resubscribe();
}
if state.noaa_decode_reset_seq != last_reset_seq {
last_reset_seq = state.noaa_decode_reset_seq;
decoder.reset();
}
}
Err(_) => break,
}
continue;
}
let silence_deadline = last_line_at + NOAA_PASS_SILENCE_TIMEOUT;
tokio::select! {
recv = pcm_rx.recv() => {
match recv {
Ok(frame) => {
let reset_seq = state_rx.borrow().noaa_decode_reset_seq;
if reset_seq != last_reset_seq {
last_reset_seq = reset_seq;
decoder.reset();
pcm_rx = pcm_rx.resubscribe();
continue;
}
let mono = downmix_mono(frame, channels);
let new_lines = tokio::task::block_in_place(|| decoder.process_samples(&mono));
if new_lines > 0 {
last_line_at = tokio::time::Instant::now();
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("NOAA decoder: dropped {} PCM frames", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
changed = state_rx.changed() => {
match changed {
Ok(()) => {
let state = state_rx.borrow();
let was_active = active;
active = state.noaa_decode_enabled;
if state.noaa_decode_reset_seq != last_reset_seq {
last_reset_seq = state.noaa_decode_reset_seq;
decoder.reset();
pass_start_ms = current_timestamp_ms();
}
if was_active && !active {
// User disabled — finalise whatever we have
finalize_noaa_pass(
&mut decoder,
pass_start_ms,
&output_dir,
&decode_tx,
&histories,
)
.await;
} else if !was_active && active {
decoder.reset();
pass_start_ms = current_timestamp_ms();
last_line_at = tokio::time::Instant::now();
pcm_rx = pcm_rx.resubscribe();
}
}
Err(_) => break,
}
}
// Auto-finalise after sustained silence (satellite pass ended)
_ = tokio::time::sleep_until(silence_deadline), if decoder.line_count() > 0 => {
info!(
"NOAA: no new lines for {}s — finalising pass ({} lines)",
NOAA_PASS_SILENCE_TIMEOUT.as_secs(),
decoder.line_count()
);
finalize_noaa_pass(
&mut decoder,
pass_start_ms,
&output_dir,
&decode_tx,
&histories,
)
.await;
// Remain active; ready for the next pass
pass_start_ms = current_timestamp_ms();
last_line_at = tokio::time::Instant::now();
}
}
}
}
/// Encode all accumulated lines as JPEG, write to disk, and broadcast the
/// `DecodedMessage::NoaaImage` event. No-ops if fewer than 2 lines decoded.
async fn finalize_noaa_pass(
decoder: &mut AptDecoder,
pass_start_ms: i64,
output_dir: &std::path::Path,
decode_tx: &broadcast::Sender<DecodedMessage>,
histories: &Arc<DecoderHistories>,
) {
if decoder.line_count() < 2 {
decoder.reset();
return;
}
let pass_end_ms = current_timestamp_ms();
let Some(apt_image) = decoder.finalize() else {
decoder.reset();
return;
};
// Build output path: <output_dir>/<YYYY-MM-DD_HH-MM-SS>.jpg
let dt = chrono::Local::now();
let filename = dt.format("%Y-%m-%d_%H-%M-%S.jpg").to_string();
let path = output_dir.join(&filename);
if let Err(e) = std::fs::create_dir_all(output_dir) {
warn!(
"NOAA: failed to create output directory {:?}: {}",
output_dir, e
);
decoder.reset();
return;
}
match std::fs::write(&path, &apt_image.jpeg) {
Ok(()) => {
info!(
"NOAA: saved {} ({} lines, {} bytes) to {:?}",
filename,
apt_image.line_count,
apt_image.jpeg.len(),
path
);
let img = NoaaImage {
rig_id: None,
pass_start_ms: apt_image.first_line_ms,
pass_end_ms,
line_count: apt_image.line_count,
path: path.to_string_lossy().into_owned(),
ts_ms: Some(pass_end_ms),
};
histories.record_noaa_image(img.clone());
let _ = decode_tx.send(DecodedMessage::NoaaImage(img));
}
Err(e) => {
warn!("NOAA: failed to write {:?}: {}", path, e);
}
}
decoder.reset();
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Virtual-channel audio support // Virtual-channel audio support
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -2952,6 +3190,11 @@ async fn handle_audio_client(
DecodedMessage::Cw, DecodedMessage::Cw,
AUDIO_MSG_CW_DECODE AUDIO_MSG_CW_DECODE
); );
push_history!(
histories.snapshot_noaa_history(),
DecodedMessage::NoaaImage,
AUDIO_MSG_NOAA_IMAGE
);
(blob, count) (blob, count)
}; };
@@ -3035,6 +3278,7 @@ async fn handle_audio_client(
DecodedMessage::Ft4(_) => AUDIO_MSG_FT4_DECODE, DecodedMessage::Ft4(_) => AUDIO_MSG_FT4_DECODE,
DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE, DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE,
DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE, DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE,
DecodedMessage::NoaaImage(_) => AUDIO_MSG_NOAA_IMAGE,
}; };
if let Ok(json) = serde_json::to_vec(&msg) { if let Ok(json) = serde_json::to_vec(&msg) {
if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await { if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await {
@@ -3062,6 +3306,7 @@ async fn handle_audio_client(
DecodedMessage::Ft4(_) => AUDIO_MSG_FT4_DECODE, DecodedMessage::Ft4(_) => AUDIO_MSG_FT4_DECODE,
DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE, DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE,
DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE, DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE,
DecodedMessage::NoaaImage(_) => AUDIO_MSG_NOAA_IMAGE,
}; };
if let Ok(json) = serde_json::to_vec(&msg) { if let Ok(json) = serde_json::to_vec(&msg) {
if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await { if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await {
+19
View File
@@ -793,6 +793,25 @@ fn spawn_rig_audio_stack(
_ = wait_for_shutdown(wspr_shutdown_rx) => {} _ = wait_for_shutdown(wspr_shutdown_rx) => {}
} }
})); }));
// Spawn NOAA APT decoder task
let noaa_pcm_rx = pcm_tx.subscribe();
let noaa_state_rx = state_rx.clone();
let noaa_decode_tx = decode_tx.clone();
let noaa_sr = rig_cfg.audio.sample_rate;
let noaa_ch = rig_cfg.audio.channels;
let noaa_shutdown_rx = shutdown_rx.clone();
let noaa_histories = histories.clone();
let noaa_output_dir = dirs::cache_dir()
.unwrap_or_else(|| std::path::PathBuf::from(".cache"))
.join("trx-rs")
.join("noaa");
handles.push(tokio::spawn(async move {
tokio::select! {
_ = audio::run_noaa_decoder(noaa_sr, noaa_ch as u16, noaa_pcm_rx, noaa_state_rx, noaa_decode_tx, noaa_histories, noaa_output_dir) => {}
_ = wait_for_shutdown(noaa_shutdown_rx) => {}
}
}));
} }
if rig_cfg.audio.tx_enabled { if rig_cfg.audio.tx_enabled {