feat(trx-core,trx-protocol,trx-backend-soapysdr): add spectrum data pipeline

Add SpectrumData struct (bins, center_hz, sample_rate) to RigState and
RigSnapshot. Add GetSpectrum RigCommand and ClientCommand plumbed through
the protocol layer. SoapySDR DSP pipeline now computes a 1024-bin FFT
(Hann window, FFT-shifted, dBFS) every 4 IQ blocks (~10 Hz update rate)
and exposes it via RigCat::get_spectrum(). The rig_task handles
GetSpectrum without persisting spectrum data in ongoing state.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-02-27 21:35:53 +01:00
parent df79f06ff0
commit 76969b5499
10 changed files with 111 additions and 6 deletions
+1
View File
@@ -32,4 +32,5 @@ pub enum RigCommand {
ResetWsprDecoder,
SetBandwidth(u32),
SetFirTaps(u32),
GetSpectrum,
}
+2 -1
View File
@@ -514,7 +514,8 @@ pub fn command_from_rig_command(cmd: RigCommand) -> Box<dyn RigCommandHandler> {
| RigCommand::ResetFt8Decoder
| RigCommand::ResetWsprDecoder
| RigCommand::SetBandwidth(_)
| RigCommand::SetFirTaps(_) => Box::new(GetSnapshotCommand),
| RigCommand::SetFirTaps(_)
| RigCommand::GetSpectrum => Box::new(GetSnapshotCommand),
}
}
+5
View File
@@ -149,6 +149,11 @@ pub trait RigCat: Rig + Send {
fn filter_state(&self) -> Option<state::RigFilterState> {
None
}
/// Return the latest spectrum frame if this backend supports spectrum output.
fn get_spectrum(&self) -> Option<state::SpectrumData> {
None
}
}
/// Snapshot of a rig's status that every backend can expose.
+20
View File
@@ -46,6 +46,10 @@ pub struct RigState {
/// Skipped in serde; flows into RigSnapshot via snapshot().
#[serde(skip)]
pub filter: Option<RigFilterState>,
/// Latest spectrum frame from SDR backends.
/// Skipped in serde (not part of persistent state); flows into RigSnapshot on demand.
#[serde(skip)]
pub spectrum: Option<SpectrumData>,
#[serde(default, skip_serializing)]
pub aprs_decode_reset_seq: u64,
#[serde(default, skip_serializing)]
@@ -132,6 +136,7 @@ impl RigState {
cw_wpm: 15,
cw_tone_hz: 700,
filter: None,
spectrum: None,
aprs_decode_reset_seq: 0,
cw_decode_reset_seq: 0,
ft8_decode_reset_seq: 0,
@@ -192,6 +197,7 @@ impl RigState {
ft8_decode_enabled: snapshot.ft8_decode_enabled,
wspr_decode_enabled: snapshot.wspr_decode_enabled,
filter: snapshot.filter,
spectrum: None, // spectrum flows through /api/spectrum, not persistent state
aprs_decode_reset_seq: 0,
cw_decode_reset_seq: 0,
ft8_decode_reset_seq: 0,
@@ -230,6 +236,7 @@ impl RigState {
ft8_decode_enabled: self.ft8_decode_enabled,
wspr_decode_enabled: self.wspr_decode_enabled,
filter: self.filter.clone(),
spectrum: self.spectrum.clone(),
})
}
@@ -264,6 +271,17 @@ pub struct RigFilterState {
pub cw_center_hz: u32,
}
/// Spectrum data from SDR backends (FFT magnitude over the full capture bandwidth).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpectrumData {
/// FFT magnitude bins in dBFS, FFT-shifted so DC (centre frequency) is at index N/2.
pub bins: Vec<f32>,
/// Centre frequency of the SDR capture in Hz.
pub center_hz: u64,
/// SDR capture sample rate in Hz; the displayed span is ±sample_rate/2.
pub sample_rate: u32,
}
/// Read-only projection of state shared with clients.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RigSnapshot {
@@ -300,4 +318,6 @@ pub struct RigSnapshot {
pub cw_tone_hz: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub filter: Option<RigFilterState>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub spectrum: Option<SpectrumData>,
}
+1
View File
@@ -403,6 +403,7 @@ mod tests {
cw_wpm: 0,
cw_tone_hz: 0,
filter: None,
spectrum: None,
}
}
}
+2
View File
@@ -47,6 +47,7 @@ pub fn client_command_to_rig(cmd: ClientCommand) -> RigCommand {
ClientCommand::ResetWsprDecoder => RigCommand::ResetWsprDecoder,
ClientCommand::SetBandwidth { bandwidth_hz } => RigCommand::SetBandwidth(bandwidth_hz),
ClientCommand::SetFirTaps { taps } => RigCommand::SetFirTaps(taps),
ClientCommand::GetSpectrum => RigCommand::GetSpectrum,
}
}
@@ -86,6 +87,7 @@ pub fn rig_command_to_client(cmd: RigCommand) -> ClientCommand {
RigCommand::ResetWsprDecoder => ClientCommand::ResetWsprDecoder,
RigCommand::SetBandwidth(bandwidth_hz) => ClientCommand::SetBandwidth { bandwidth_hz },
RigCommand::SetFirTaps(taps) => ClientCommand::SetFirTaps { taps },
RigCommand::GetSpectrum => ClientCommand::GetSpectrum,
}
}
+1
View File
@@ -37,6 +37,7 @@ pub enum ClientCommand {
ResetWsprDecoder,
SetBandwidth { bandwidth_hz: u32 },
SetFirTaps { taps: u32 },
GetSpectrum,
}
/// Envelope for client commands with optional authentication token and rig routing.
+7
View File
@@ -449,6 +449,13 @@ async fn process_command(
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::GetSpectrum => {
// Fetch current spectrum and embed it in a one-shot snapshot.
ctx.state.spectrum = ctx.rig.get_spectrum();
let result = snapshot_from(ctx.state);
ctx.state.spectrum = None; // don't persist in ongoing state
return result;
}
_ => {} // fall through to normal rig handler
}
@@ -403,6 +403,10 @@ impl ChannelDsp {
pub struct SdrPipeline {
pub pcm_senders: Vec<broadcast::Sender<Vec<f32>>>,
pub channel_dsps: Vec<Arc<Mutex<ChannelDsp>>>,
/// Latest FFT magnitude bins (dBFS, FFT-shifted), updated ~10 Hz.
pub spectrum_buf: Arc<Mutex<Option<Vec<f32>>>>,
/// SDR capture sample rate, needed by `SoapySdrRig::get_spectrum`.
pub sdr_sample_rate: u32,
}
impl SdrPipeline {
@@ -438,17 +442,21 @@ impl SdrPipeline {
}
let thread_dsps: Vec<Arc<Mutex<ChannelDsp>>> = channel_dsps.clone();
let spectrum_buf: Arc<Mutex<Option<Vec<f32>>>> = Arc::new(Mutex::new(None));
let thread_spectrum_buf = spectrum_buf.clone();
std::thread::Builder::new()
.name("sdr-iq-read".to_string())
.spawn(move || {
iq_read_loop(source, sdr_sample_rate, thread_dsps, iq_tx);
iq_read_loop(source, sdr_sample_rate, thread_dsps, iq_tx, thread_spectrum_buf);
})
.expect("failed to spawn sdr-iq-read thread");
Self {
pcm_senders,
channel_dsps,
spectrum_buf,
sdr_sample_rate,
}
}
}
@@ -459,11 +467,18 @@ impl SdrPipeline {
pub const IQ_BLOCK_SIZE: usize = 4096;
/// Number of FFT bins for the spectrum display.
const SPECTRUM_FFT_SIZE: usize = 1024;
/// Update the spectrum buffer every this many IQ blocks (~10 Hz at 1.92 MHz / 4096 block).
const SPECTRUM_UPDATE_BLOCKS: usize = 4;
fn iq_read_loop(
mut source: Box<dyn IqSource>,
sdr_sample_rate: u32,
channel_dsps: Vec<Arc<Mutex<ChannelDsp>>>,
iq_tx: broadcast::Sender<Vec<Complex<f32>>>,
spectrum_buf: Arc<Mutex<Option<Vec<f32>>>>,
) {
let mut block = vec![Complex::new(0.0_f32, 0.0_f32); IQ_BLOCK_SIZE];
let block_duration_ms = if sdr_sample_rate > 0 {
@@ -471,11 +486,19 @@ fn iq_read_loop(
} else {
1
};
// Blocking sources (real hardware) already pace the loop inside read_into.
// Non-blocking sources (MockIqSource) need an explicit sleep to avoid
// busy-spinning at 100 % CPU.
let throttle = !source.is_blocking();
// Pre-compute Hann window coefficients.
let hann_window: Vec<f32> = (0..SPECTRUM_FFT_SIZE)
.map(|i| {
0.5 * (1.0 - (2.0 * PI * i as f32 / (SPECTRUM_FFT_SIZE - 1) as f32).cos())
})
.collect();
let mut planner = FftPlanner::<f32>::new();
let fft = planner.plan_fft_forward(SPECTRUM_FFT_SIZE);
let mut spectrum_counter: usize = 0;
loop {
let n = match source.read_into(&mut block) {
Ok(n) => n,
@@ -504,6 +527,35 @@ fn iq_read_loop(
}
}
// Periodically compute and store a spectrum snapshot.
spectrum_counter += 1;
if spectrum_counter >= SPECTRUM_UPDATE_BLOCKS {
spectrum_counter = 0;
let take = n.min(SPECTRUM_FFT_SIZE);
let mut buf: Vec<FftComplex<f32>> = samples[..take]
.iter()
.enumerate()
.map(|(i, s)| FftComplex::new(s.re * hann_window[i], s.im * hann_window[i]))
.collect();
buf.resize(SPECTRUM_FFT_SIZE, FftComplex::new(0.0, 0.0));
fft.process(&mut buf);
// FFT-shift: rearrange so negative frequencies come first (DC in centre).
let half = SPECTRUM_FFT_SIZE / 2;
let bins: Vec<f32> = buf[half..]
.iter()
.chain(buf[..half].iter())
.map(|c| {
let mag = (c.re * c.re + c.im * c.im).sqrt() / SPECTRUM_FFT_SIZE as f32;
20.0 * (mag.max(1e-10_f32)).log10()
})
.collect();
if let Ok(mut guard) = spectrum_buf.lock() {
*guard = Some(bins);
}
}
if throttle {
std::thread::sleep(std::time::Duration::from_millis(block_duration_ms));
}
@@ -7,10 +7,11 @@ pub mod dsp;
pub mod real_iq_source;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use trx_core::radio::freq::{Band, Freq};
use trx_core::rig::response::RigError;
use trx_core::rig::state::RigFilterState;
use trx_core::rig::state::{RigFilterState, SpectrumData};
use trx_core::rig::{
AudioSource, Rig, RigAccessMethod, RigCapabilities, RigCat, RigInfo, RigStatusFuture,
};
@@ -27,6 +28,8 @@ pub struct SoapySdrRig {
/// Current filter state of the primary channel (for filter_controls support).
bandwidth_hz: u32,
fir_taps: u32,
/// Shared spectrum magnitude buffer populated by the IQ read loop.
spectrum_buf: Arc<Mutex<Option<Vec<f32>>>>,
}
impl SoapySdrRig {
@@ -141,6 +144,8 @@ impl SoapySdrRig {
.map(|&(_, _, bw, taps)| (bw, taps as u32))
.unwrap_or((3000, 64));
let spectrum_buf = pipeline.spectrum_buf.clone();
Ok(Self {
info,
freq: initial_freq,
@@ -149,6 +154,7 @@ impl SoapySdrRig {
primary_channel_idx: 0,
bandwidth_hz,
fir_taps,
spectrum_buf,
})
}
@@ -357,6 +363,15 @@ impl RigCat for SoapySdrRig {
})
}
fn get_spectrum(&self) -> Option<SpectrumData> {
let bins = self.spectrum_buf.lock().ok()?.clone()?;
Some(SpectrumData {
bins,
center_hz: self.freq.hz,
sample_rate: self.pipeline.sdr_sample_rate,
})
}
/// Override: this backend provides demodulated PCM audio.
fn as_audio_source(&self) -> Option<&dyn AudioSource> {
Some(self)