From 5090ab71b37e600a2c9ebfed3ef39dd0ac00a0b4 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Tue, 24 Feb 2026 21:34:09 +0100 Subject: [PATCH] [feat](trx-backend-soapysdr): implement IQ DSP pipeline (mixer/FIR/decimate/demod) Add dsp.rs with IqSource trait abstraction, MockIqSource, windowed-sinc FIR low-pass filter, ChannelDsp (mixer/decimate/demod/frame-accumulator), and SdrPipeline which spawns a dedicated IQ read thread. No soapysdr crate dependency; the real device will be wired in SDR-07. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- SDR.md | 2 +- .../trx-backend-soapysdr/src/dsp.rs | 529 ++++++++++++++++++ .../trx-backend-soapysdr/src/lib.rs | 1 + 3 files changed, 531 insertions(+), 1 deletion(-) create mode 100644 src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs diff --git a/SDR.md b/SDR.md index 2b46118..d491829 100644 --- a/SDR.md +++ b/SDR.md @@ -26,7 +26,7 @@ This document specifies the requirements for a SoapySDR-based RX-only backend (` |----|--------|------|---------|-------| | SDR-04 | `[x]` | Create crate scaffold: `Cargo.toml` (deps: `soapysdr`, `num-complex`, `tokio`), empty `lib.rs` | `src/trx-server/trx-backend/trx-backend-soapysdr/` | SDR-01, SDR-02 | | SDR-05 | `[x]` | Implement `demod.rs`: SSB (USB/LSB), AM envelope, FM quadrature, CW narrow BPF+envelope | `…/src/demod.rs` | SDR-04 | -| SDR-06 | `[ ]` | Implement `dsp.rs`: IQ broadcast loop (SoapySDR read thread → `broadcast::Sender>>`); per-channel mixer → FIR LPF → decimator → demod → frame accumulator → `broadcast::Sender>` | `…/src/dsp.rs` | SDR-04, SDR-05 | +| SDR-06 | `[x]` | Implement `dsp.rs`: IQ broadcast loop (SoapySDR read thread → `broadcast::Sender>>`); per-channel mixer → FIR LPF → decimator → demod → frame accumulator → `broadcast::Sender>` | `…/src/dsp.rs` | SDR-04, SDR-05 | | SDR-07 | `[ ]` | Implement `SoapySdrRig` in `lib.rs`: `RigCat` (RX methods + `not_supported` stubs for TX), `AudioSource`, gain control (manual/auto with fallback), primary channel freq/mode tracking | `…/src/lib.rs` | SDR-03, SDR-06 | ### Server integration diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs new file mode 100644 index 0000000..f13eb45 --- /dev/null +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs @@ -0,0 +1,529 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! IQ DSP pipeline: IQ source abstraction, FIR low-pass filter, +//! per-channel mixer/decimator/demodulator, and frame accumulator. + +use std::f32::consts::PI; +use std::sync::{Arc, Mutex}; + +use num_complex::Complex; +use tokio::sync::broadcast; +use trx_core::rig::state::RigMode; + +use crate::demod::Demodulator; + +// --------------------------------------------------------------------------- +// IQ source abstraction +// --------------------------------------------------------------------------- + +/// Abstraction over any IQ sample source (real SoapySDR device or mock). +pub trait IqSource: Send + 'static { + /// Read the next block of IQ samples into `buf`. + /// Returns the number of samples written, or an error string. + fn read_into(&mut self, buf: &mut [Complex]) -> Result; +} + +// --------------------------------------------------------------------------- +// Mock IQ source +// --------------------------------------------------------------------------- + +/// IQ source that produces silence (all zeros). Used when no SDR hardware is present. +pub struct MockIqSource; + +impl IqSource for MockIqSource { + fn read_into(&mut self, buf: &mut [Complex]) -> Result { + buf.fill(Complex::new(0.0, 0.0)); + Ok(buf.len()) + } +} + +// --------------------------------------------------------------------------- +// FIR low-pass filter +// --------------------------------------------------------------------------- + +/// A simple windowed-sinc FIR low-pass filter. +/// +/// Used for: +/// 1. Pre-decimation anti-aliasing (cutoff = audio_rate / 2 / sdr_rate) +/// 2. Post-demod audio BPF (cutoff = audio_bandwidth_hz / audio_rate) +pub struct FirFilter { + coeffs: Vec, + /// Ring buffer holding the last (coeffs.len() - 1) samples. + state: Vec, + pos: usize, +} + +impl FirFilter { + /// Build a windowed-sinc low-pass FIR. + /// + /// `cutoff_norm`: normalised cutoff frequency (0.0–0.5), i.e. `cutoff_hz / sample_rate`. + /// `taps`: number of coefficients (odd recommended). + pub fn new(cutoff_norm: f32, taps: usize) -> Self { + assert!(taps >= 1, "FIR filter must have at least 1 tap"); + let m = (taps - 1) as f32; + let mut coeffs = Vec::with_capacity(taps); + + for i in 0..taps { + let x = i as f32 - m / 2.0; + let sinc = if x == 0.0 { + 2.0 * cutoff_norm + } else { + (2.0 * PI * cutoff_norm * x).sin() / (PI * x) + }; + // Hann window + let window = if taps == 1 { + 1.0 + } else { + 0.5 * (1.0 - (2.0 * PI * i as f32 / m).cos()) + }; + coeffs.push(sinc * window); + } + + // Normalise so sum of coefficients equals 1.0. + let sum: f32 = coeffs.iter().sum(); + if sum.abs() > 1e-12 { + let inv = 1.0 / sum; + for c in &mut coeffs { + *c *= inv; + } + } + + let state_len = taps.saturating_sub(1); + Self { + coeffs, + state: vec![0.0; state_len], + pos: 0, + } + } + + /// Filter a single real sample and return the filtered output. + pub fn process(&mut self, sample: f32) -> f32 { + let n = self.state.len(); + + if n == 0 { + // Single-tap: just scale by the one coefficient. + return sample * self.coeffs[0]; + } + + // Write new sample into ring buffer. + self.state[self.pos] = sample; + self.pos = (self.pos + 1) % n; + + // Convolve: coeffs[0] applied to newest sample (before ring write), + // then work through history. + let mut acc = self.coeffs[0] * sample; + for k in 1..self.coeffs.len() { + // Index into ring buffer going backwards from pos. + let idx = (self.pos + n - k) % n; + acc += self.coeffs[k] * self.state[idx]; + } + acc + } +} + +// --------------------------------------------------------------------------- +// Channel DSP context +// --------------------------------------------------------------------------- + +/// Per-channel DSP state: mixer, FIR, decimator, demodulator, frame accumulator. +pub struct ChannelDsp { + /// Frequency offset of this channel from the SDR centre (Hz). + /// Already accounts for `center_offset_hz`. + pub channel_if_hz: f64, + /// Current demodulator (can be swapped via `set_mode`). + pub demodulator: Demodulator, + /// FIR anti-alias filter applied to I component before decimation. + pub lpf_i: FirFilter, + /// FIR anti-alias filter applied to Q component before decimation. + pub lpf_q: FirFilter, + /// Decimation factor: `sdr_sample_rate / audio_sample_rate`. + pub decim_factor: usize, + /// Accumulator for output PCM frames. + pub frame_buf: Vec, + /// Target frame size in samples (`audio_sample_rate * frame_duration_ms / 1000`). + pub frame_size: usize, + /// Sender for completed PCM frames. + pub pcm_tx: broadcast::Sender>, + /// Current oscillator phase (radians) for the complex mixer. + pub mixer_phase: f64, + /// Phase increment per IQ sample: `2π * channel_if_hz / sdr_sample_rate`. + pub mixer_phase_inc: f64, + /// Decimation counter: counts input samples, fires every `decim_factor` samples. + decim_counter: usize, +} + +impl ChannelDsp { + /// Construct a new `ChannelDsp`. + /// + /// `channel_if_hz`: IF offset within the IQ band (Hz, signed). + /// `mode`: initial demodulation mode. + /// `sdr_sample_rate`: IQ capture rate (Hz). + /// `audio_sample_rate`: output PCM rate (Hz). + /// `frame_duration_ms`: output frame length (ms). + /// `audio_bandwidth_hz`: one-sided audio BPF cutoff (Hz). + /// `fir_taps`: number of FIR taps. + /// `pcm_tx`: broadcast sender for completed PCM frames. + #[allow(clippy::too_many_arguments)] + pub fn new( + channel_if_hz: f64, + mode: &RigMode, + sdr_sample_rate: u32, + audio_sample_rate: u32, + frame_duration_ms: u16, + audio_bandwidth_hz: u32, + fir_taps: usize, + pcm_tx: broadcast::Sender>, + ) -> Self { + let decim_factor = if audio_sample_rate == 0 || sdr_sample_rate == 0 { + 1 + } else { + (sdr_sample_rate / audio_sample_rate).max(1) as usize + }; + + let frame_size = if audio_sample_rate == 0 || frame_duration_ms == 0 { + 960 + } else { + (audio_sample_rate as usize * frame_duration_ms as usize) / 1000 + }; + + // Normalised cutoff for the anti-alias LPF: audio_bandwidth_hz / sdr_sample_rate. + // This keeps only the audio-bandwidth-wide slice of the IQ after mixing. + let cutoff_norm = if sdr_sample_rate == 0 { + 0.1 + } else { + (audio_bandwidth_hz as f32 / 2.0) / sdr_sample_rate as f32 + }; + let cutoff_norm = cutoff_norm.min(0.499); // clamp below Nyquist + + let taps = fir_taps.max(1); + + let mixer_phase_inc = if sdr_sample_rate == 0 { + 0.0 + } else { + 2.0 * std::f64::consts::PI * channel_if_hz / sdr_sample_rate as f64 + }; + + Self { + channel_if_hz, + demodulator: Demodulator::for_mode(mode), + lpf_i: FirFilter::new(cutoff_norm, taps), + lpf_q: FirFilter::new(cutoff_norm, taps), + decim_factor, + frame_buf: Vec::with_capacity(frame_size * 2), + frame_size, + pcm_tx, + mixer_phase: 0.0, + mixer_phase_inc, + decim_counter: 0, + } + } + + /// Update the demodulator to match a new mode. + pub fn set_mode(&mut self, mode: &RigMode) { + self.demodulator = Demodulator::for_mode(mode); + } + + /// Process a block of raw IQ samples through the full DSP chain. + /// + /// 1. Mix each sample to baseband using the channel oscillator. + /// 2. Apply LPF to both I and Q. + /// 3. Decimate by `decim_factor`. + /// 4. Accumulate decimated samples; when enough are collected, demodulate. + /// 5. Append demodulated audio to `frame_buf`. + /// 6. Emit complete frames via `pcm_tx`. + pub fn process_block(&mut self, block: &[Complex]) { + // Pre-allocate a local decimated IQ accumulation buffer. + let capacity = block.len() / self.decim_factor + 1; + let mut local_dec: Vec> = Vec::with_capacity(capacity); + + for &sample in block { + // --- 1. Mix to baseband --- + let (sin, cos) = self.mixer_phase.sin_cos(); + // exp(-j * phase) = cos(phase) - j*sin(phase) + let lo = Complex::new(cos as f32, -(sin as f32)); + let mixed = sample * lo; + + // Advance mixer phase, wrap at 2π. + self.mixer_phase += self.mixer_phase_inc; + if self.mixer_phase >= std::f64::consts::TAU { + self.mixer_phase -= std::f64::consts::TAU; + } else if self.mixer_phase < -std::f64::consts::TAU { + self.mixer_phase += std::f64::consts::TAU; + } + + // --- 2. Apply LPF to both I and Q --- + let filtered_i = self.lpf_i.process(mixed.re); + let filtered_q = self.lpf_q.process(mixed.im); + let filtered = Complex::new(filtered_i, filtered_q); + + // --- 3. Decimate --- + self.decim_counter += 1; + if self.decim_counter >= self.decim_factor { + self.decim_counter = 0; + local_dec.push(filtered); + } + } + + if local_dec.is_empty() { + return; + } + + // --- 4. Demodulate the decimated block --- + let audio = self.demodulator.demodulate(&local_dec); + + // --- 5. Append to frame buffer --- + self.frame_buf.extend_from_slice(&audio); + + // --- 6. Emit complete frames --- + while self.frame_buf.len() >= self.frame_size { + let frame: Vec = self.frame_buf.drain(..self.frame_size).collect(); + // Ignore send errors (no active receivers is fine). + let _ = self.pcm_tx.send(frame); + } + } +} + +// --------------------------------------------------------------------------- +// Top-level pipeline struct +// --------------------------------------------------------------------------- + +/// Handle to the running SDR DSP pipeline. +pub struct SdrPipeline { + /// One PCM sender per channel (index matches `channels` order in `start`). + pub pcm_senders: Vec>>, + /// Shared references to per-channel DSP state (for runtime mode updates). + pub channel_dsps: Vec>>, +} + +impl SdrPipeline { + /// Build and start the IQ DSP pipeline. + /// + /// Spawns one OS thread that reads IQ samples from `source` and drives + /// all per-channel DSP chains. + /// + /// # Parameters + /// - `source`: IQ sample source (ownership transferred to read thread). + /// - `sdr_sample_rate`: IQ capture rate (Hz). + /// - `audio_sample_rate`: output PCM rate (Hz). + /// - `frame_duration_ms`: output frame length (ms). + /// - `channels`: slice of `(channel_if_hz, initial_mode, audio_bandwidth_hz, fir_taps)`. + /// + /// # Returns + /// A `SdrPipeline` handle with PCM senders and shared channel DSP references. + pub fn start( + source: Box, + sdr_sample_rate: u32, + audio_sample_rate: u32, + frame_duration_ms: u16, + channels: &[(f64, RigMode, u32, usize)], + ) -> Self { + // Broadcast channel capacity: 64 IQ blocks. + const IQ_BROADCAST_CAPACITY: usize = 64; + let (iq_tx, _iq_rx) = + broadcast::channel::>>(IQ_BROADCAST_CAPACITY); + + // PCM broadcast capacity: enough for several frames of latency. + const PCM_BROADCAST_CAPACITY: usize = 32; + + let mut pcm_senders = Vec::with_capacity(channels.len()); + let mut channel_dsps: Vec>> = Vec::with_capacity(channels.len()); + + for &(channel_if_hz, ref mode, audio_bandwidth_hz, fir_taps) in channels { + let (pcm_tx, _pcm_rx) = broadcast::channel::>(PCM_BROADCAST_CAPACITY); + let dsp = ChannelDsp::new( + channel_if_hz, + mode, + sdr_sample_rate, + audio_sample_rate, + frame_duration_ms, + audio_bandwidth_hz, + fir_taps, + pcm_tx.clone(), + ); + pcm_senders.push(pcm_tx); + channel_dsps.push(Arc::new(Mutex::new(dsp))); + } + + // Clone the Arc references for the read thread. + let thread_dsps: Vec>> = channel_dsps.clone(); + + // Spawn the IQ read thread. + std::thread::Builder::new() + .name("sdr-iq-read".to_string()) + .spawn(move || { + iq_read_loop(source, sdr_sample_rate, thread_dsps, iq_tx); + }) + .expect("failed to spawn sdr-iq-read thread"); + + Self { + pcm_senders, + channel_dsps, + } + } +} + +// --------------------------------------------------------------------------- +// IQ read loop +// --------------------------------------------------------------------------- + +/// Block size for IQ reads. +const IQ_BLOCK_SIZE: usize = 4096; + +/// The main IQ read loop. Runs on a dedicated OS thread. +/// +/// Reads IQ blocks from `source` and dispatches them to all channel DSP +/// instances. Also publishes raw IQ blocks on `iq_tx` for any downstream +/// subscribers. +fn iq_read_loop( + mut source: Box, + _sdr_sample_rate: u32, + channel_dsps: Vec>>, + iq_tx: broadcast::Sender>>, +) { + let mut block = vec![Complex::new(0.0_f32, 0.0_f32); IQ_BLOCK_SIZE]; + + loop { + let n = match source.read_into(&mut block) { + Ok(n) => n, + Err(e) => { + tracing::warn!("IQ source read error: {}; retrying", e); + // Brief back-off to avoid spinning on persistent errors. + std::thread::sleep(std::time::Duration::from_millis(10)); + continue; + } + }; + + if n == 0 { + // Source returned zero samples — treat as transient, back off. + std::thread::sleep(std::time::Duration::from_millis(1)); + continue; + } + + let samples = &block[..n]; + + // Publish raw IQ to broadcast (best-effort; ignore lagged errors). + let _ = iq_tx.send(samples.to_vec()); + + // Drive per-channel DSP chains. + for dsp_arc in &channel_dsps { + match dsp_arc.lock() { + Ok(mut dsp) => dsp.process_block(samples), + Err(e) => { + tracing::error!("channel DSP mutex poisoned: {}", e); + } + } + } + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use trx_core::rig::state::RigMode; + + #[test] + fn mock_iq_source_fills_zeros() { + let mut src = MockIqSource; + let mut buf = vec![Complex::new(1.0_f32, 1.0_f32); 64]; + let n = src.read_into(&mut buf).unwrap(); + assert_eq!(n, 64); + for s in &buf { + assert_eq!(s.re, 0.0); + assert_eq!(s.im, 0.0); + } + } + + #[test] + fn fir_filter_dc_passthrough() { + // A DC signal (constant 1.0) should pass through the LPF. + let mut fir = FirFilter::new(0.1, 31); + let mut out = 0.0_f32; + // Run enough samples for the filter to settle. + for _ in 0..100 { + out = fir.process(1.0); + } + // After settling, DC output should be close to 1.0. + assert!( + (out - 1.0).abs() < 0.05, + "DC passthrough failed: got {}", + out + ); + } + + #[test] + fn fir_filter_single_tap() { + let mut fir = FirFilter::new(0.25, 1); + let out = fir.process(0.5); + // With one tap, the coefficient sum is normalised to 1.0, so output ≈ 0.5. + assert!((out - 0.5).abs() < 1e-5, "single-tap output: {}", out); + } + + #[test] + fn channel_dsp_processes_silence() { + let (pcm_tx, _pcm_rx) = broadcast::channel::>(8); + let mut dsp = ChannelDsp::new( + 0.0, // channel_if_hz + &RigMode::USB, + 48_000, // sdr_sample_rate + 8_000, // audio_sample_rate (decim = 6) + 20, // frame_duration_ms → frame_size = 160 + 3000, // audio_bandwidth_hz + 31, // fir_taps + pcm_tx, + ); + + // Feed 4096 zero samples — should not panic. + let block = vec![Complex::new(0.0_f32, 0.0_f32); 4096]; + dsp.process_block(&block); + } + + #[test] + fn channel_dsp_set_mode() { + let (pcm_tx, _) = broadcast::channel::>(8); + let mut dsp = ChannelDsp::new( + 0.0, + &RigMode::USB, + 48_000, + 8_000, + 20, + 3000, + 31, + pcm_tx, + ); + assert_eq!(dsp.demodulator, Demodulator::Usb); + dsp.set_mode(&RigMode::FM); + assert_eq!(dsp.demodulator, Demodulator::Fm); + } + + #[test] + fn pipeline_starts_with_mock_source() { + let pipeline = SdrPipeline::start( + Box::new(MockIqSource), + 1_920_000, + 48_000, + 20, + &[(200_000.0, RigMode::USB, 3000, 64)], + ); + assert_eq!(pipeline.pcm_senders.len(), 1); + assert_eq!(pipeline.channel_dsps.len(), 1); + } + + #[test] + fn pipeline_empty_channels() { + let pipeline = SdrPipeline::start( + Box::new(MockIqSource), + 1_920_000, + 48_000, + 20, + &[], + ); + assert_eq!(pipeline.pcm_senders.len(), 0); + assert_eq!(pipeline.channel_dsps.len(), 0); + } +} diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs index 9923b7e..4b9d76c 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: BSD-2-Clause pub mod demod; +pub mod dsp; use std::pin::Pin;