From 600257a7c4ca66a98bfc8c7617c7e102bfb60745 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Fri, 27 Feb 2026 01:05:49 +0100 Subject: [PATCH] [perf](trx-backend-soapysdr): replace FIR with FFT overlap-save via rustfft MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the per-sample ring-buffer FIR convolution with block-level overlap-save convolution using rustfft. For a block of M samples and N taps the old approach costs O(N·M); the new one costs O(M log M), with rustfft using SIMD (AVX2/SSE4) internally. Key changes: - Add rustfft = "6" dependency - Add BlockFirFilter: overlap-save filter with pre-computed H(f) and a single forward+inverse FFT pair per block (no per-sample multiply) - ChannelDsp.process_block() now: 1. Batch-mixes entire block to baseband in one vectorisable loop 2. Applies BlockFirFilter to I and Q (one FFT pair each) 3. Decimates and demodulates as before - Keep the old FirFilter for unit tests (sample-by-sample interface) - Add BlockFirFilter unit tests (DC passthrough, length preservation) - IQ_BLOCK_SIZE promoted to pub const for use in filter sizing For the default config (4096-sample blocks, 64 taps, decim=40): Old: ~262144 multiply-adds per FIR × 2 components = ~524k per block New: ~2 × (3 × 8192 × log2(8192)) ops, all SIMD-vectorised by rustfft Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- Cargo.lock | 49 ++ .../trx-backend-soapysdr/Cargo.toml | 1 + .../trx-backend-soapysdr/src/dsp.rs | 431 +++++++++++------- 3 files changed, 312 insertions(+), 169 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 826ef72..5ade2b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1461,6 +1461,15 @@ dependencies = [ "syn", ] +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1619,6 +1628,15 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "primal-check" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc0d895b311e3af9902528fbb8f928688abbd95872819320517cc24ca6b2bd08" +dependencies = [ + "num-integer", +] + [[package]] name = "proc-macro-crate" version = "3.4.0" @@ -1787,6 +1805,20 @@ dependencies = [ "semver", ] +[[package]] +name = "rustfft" +version = "6.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21db5f9893e91f41798c88680037dba611ca6674703c1a18601b01a72c8adb89" +dependencies = [ + "num-complex", + "num-integer", + "num-traits", + "primal-check", + "strength_reduce", + "transpose", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -2003,6 +2035,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "strength_reduce" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe895eb47f22e2ddd4dabc02bce419d2e643c8e3b585c78158b349195bc24d82" + [[package]] name = "strsim" version = "0.11.1" @@ -2317,6 +2355,16 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "transpose" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad61aed86bc3faea4300c7aee358b4c6d0c8d6ccc36524c96e4c92ccf26e77e" +dependencies = [ + "num-integer", + "strength_reduce", +] + [[package]] name = "trx-app" version = "0.1.0" @@ -2378,6 +2426,7 @@ name = "trx-backend-soapysdr" version = "0.1.0" dependencies = [ "num-complex", + "rustfft", "serde", "soapysdr", "tokio", diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml b/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml index a38e34a..ad5d4c1 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml @@ -14,4 +14,5 @@ tokio = { workspace = true, features = ["sync", "rt"] } serde = { workspace = true } tracing = { workspace = true } num-complex = "0.4" +rustfft = "6" soapysdr = "0.3" diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs index 1366cc2..1de5325 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs @@ -2,13 +2,21 @@ // // SPDX-License-Identifier: BSD-2-Clause -//! IQ DSP pipeline: IQ source abstraction, FIR low-pass filter, +//! IQ DSP pipeline: IQ source abstraction, FFT-based FIR low-pass filter, //! per-channel mixer/decimator/demodulator, and frame accumulator. +//! +//! The FIR filter uses **overlap-save convolution** via `rustfft`, replacing +//! the previous per-sample ring-buffer approach. For a block of M samples +//! and N taps, direct convolution costs O(N·M) multiply-adds, while the FFT +//! approach costs O(M log M) — a significant saving for the tap counts (64+) +//! and block sizes (4096) used here. use std::f32::consts::PI; use std::sync::{Arc, Mutex}; use num_complex::Complex; +use rustfft::num_complex::Complex as FftComplex; +use rustfft::{Fft, FftPlanner}; use tokio::sync::broadcast; use trx_core::rig::state::RigMode; @@ -40,56 +48,53 @@ impl IqSource for MockIqSource { } // --------------------------------------------------------------------------- -// FIR low-pass filter +// Windowed-sinc coefficient generator (shared by FirFilter and BlockFirFilter) // --------------------------------------------------------------------------- -/// A simple windowed-sinc FIR low-pass filter. +fn windowed_sinc_coeffs(cutoff_norm: f32, taps: usize) -> Vec { + assert!(taps >= 1, "FIR filter must have at least 1 tap"); + let m = (taps - 1) as f32; + let mut coeffs = Vec::with_capacity(taps); + for i in 0..taps { + let x = i as f32 - m / 2.0; + let sinc = if x == 0.0 { + 2.0 * cutoff_norm + } else { + (2.0 * PI * cutoff_norm * x).sin() / (PI * x) + }; + let window = if taps == 1 { + 1.0 + } else { + 0.5 * (1.0 - (2.0 * PI * i as f32 / m).cos()) + }; + coeffs.push(sinc * window); + } + let sum: f32 = coeffs.iter().sum(); + if sum.abs() > 1e-12 { + let inv = 1.0 / sum; + for c in &mut coeffs { + *c *= inv; + } + } + coeffs +} + +// --------------------------------------------------------------------------- +// FIR low-pass filter (sample-by-sample, kept for unit tests) +// --------------------------------------------------------------------------- + +/// A simple windowed-sinc FIR low-pass filter (sample-by-sample interface). /// -/// Used for: -/// 1. Pre-decimation anti-aliasing (cutoff = audio_rate / 2 / sdr_rate) -/// 2. Post-demod audio BPF (cutoff = audio_bandwidth_hz / audio_rate) +/// Used only in unit tests. The DSP pipeline uses [`BlockFirFilter`] instead. pub struct FirFilter { coeffs: Vec, - /// Ring buffer holding the last (coeffs.len() - 1) samples. state: Vec, pos: usize, } impl FirFilter { - /// Build a windowed-sinc low-pass FIR. - /// - /// `cutoff_norm`: normalised cutoff frequency (0.0–0.5), i.e. `cutoff_hz / sample_rate`. - /// `taps`: number of coefficients (odd recommended). pub fn new(cutoff_norm: f32, taps: usize) -> Self { - assert!(taps >= 1, "FIR filter must have at least 1 tap"); - let m = (taps - 1) as f32; - let mut coeffs = Vec::with_capacity(taps); - - for i in 0..taps { - let x = i as f32 - m / 2.0; - let sinc = if x == 0.0 { - 2.0 * cutoff_norm - } else { - (2.0 * PI * cutoff_norm * x).sin() / (PI * x) - }; - // Hann window - let window = if taps == 1 { - 1.0 - } else { - 0.5 * (1.0 - (2.0 * PI * i as f32 / m).cos()) - }; - coeffs.push(sinc * window); - } - - // Normalise so sum of coefficients equals 1.0. - let sum: f32 = coeffs.iter().sum(); - if sum.abs() > 1e-12 { - let inv = 1.0 / sum; - for c in &mut coeffs { - *c *= inv; - } - } - + let coeffs = windowed_sinc_coeffs(cutoff_norm, taps); let state_len = taps.saturating_sub(1); Self { coeffs, @@ -98,24 +103,15 @@ impl FirFilter { } } - /// Filter a single real sample and return the filtered output. pub fn process(&mut self, sample: f32) -> f32 { let n = self.state.len(); - if n == 0 { - // Single-tap: just scale by the one coefficient. return sample * self.coeffs[0]; } - - // Write new sample into ring buffer. self.state[self.pos] = sample; self.pos = (self.pos + 1) % n; - - // Convolve: coeffs[0] applied to newest sample (before ring write), - // then work through history. let mut acc = self.coeffs[0] * sample; for k in 1..self.coeffs.len() { - // Index into ring buffer going backwards from pos. let idx = (self.pos + n - k) % n; acc += self.coeffs[k] * self.state[idx]; } @@ -123,48 +119,149 @@ impl FirFilter { } } +// --------------------------------------------------------------------------- +// Block FIR filter — overlap-save via rustfft +// --------------------------------------------------------------------------- + +/// FFT-based overlap-save FIR low-pass filter (block interface). +/// +/// For a block of M samples and N taps the direct cost is O(N·M); here it +/// is O(M log M) plus a single coefficient FFT computed once at construction. +/// +/// The filter is initialised for a nominal block size of [`IQ_BLOCK_SIZE`]. +/// Smaller blocks are handled correctly (they incur a small padding overhead). +pub struct BlockFirFilter { + /// Frequency-domain filter coefficients (pre-computed, length `fft_size`). + h_freq: Vec>, + /// Overlap buffer: last `n_taps - 1` input samples (zero-initialised). + overlap: Vec, + n_taps: usize, + fft_size: usize, + fft: Arc>, + ifft: Arc>, +} + +impl BlockFirFilter { + /// Create a new `BlockFirFilter`. + /// + /// `cutoff_norm`: normalised cutoff (0.0–0.5), i.e. `cutoff_hz / sample_rate`. + /// `taps`: number of FIR taps. + /// `block_size`: expected input block length (used to size the internal FFT). + pub fn new(cutoff_norm: f32, taps: usize, block_size: usize) -> Self { + let taps = taps.max(1); + let coeffs = windowed_sinc_coeffs(cutoff_norm, taps); + + // Choose the smallest power-of-two FFT that fits the overlap-save frame. + let fft_size = (block_size + taps - 1).next_power_of_two(); + + let mut planner = FftPlanner::::new(); + let fft = planner.plan_fft_forward(fft_size); + let ifft = planner.plan_fft_inverse(fft_size); + + // Pre-compute H(f) = FFT of zero-padded coefficients. + let mut h_buf: Vec> = coeffs + .iter() + .map(|&c| FftComplex::new(c, 0.0)) + .collect(); + h_buf.resize(fft_size, FftComplex::new(0.0, 0.0)); + fft.process(&mut h_buf); + + Self { + h_freq: h_buf, + overlap: vec![0.0; taps.saturating_sub(1)], + n_taps: taps, + fft_size, + fft, + ifft, + } + } + + /// Filter a block of real samples and return filtered samples of the same length. + /// + /// Internally performs one forward FFT, a point-wise multiply with the + /// pre-computed filter response, and one inverse FFT. + pub fn filter_block(&mut self, input: &[f32]) -> Vec { + let n_new = input.len(); + let n_overlap = self.n_taps.saturating_sub(1); + + // Build the time-domain frame: [overlap (N-1)] ++ [new input] ++ [zeros] + let mut buf: Vec> = Vec::with_capacity(self.fft_size); + for &s in &self.overlap { + buf.push(FftComplex::new(s, 0.0)); + } + for &s in input { + buf.push(FftComplex::new(s, 0.0)); + } + buf.resize(self.fft_size, FftComplex::new(0.0, 0.0)); + + // Forward FFT. + self.fft.process(&mut buf); + + // Point-wise multiply with H(f); fold in the IFFT normalisation here + // to avoid a second pass. + let scale = 1.0 / self.fft_size as f32; + for (x, &h) in buf.iter_mut().zip(self.h_freq.iter()) { + *x = FftComplex::new( + (x.re * h.re - x.im * h.im) * scale, + (x.re * h.im + x.im * h.re) * scale, + ); + } + + // Inverse FFT. + self.ifft.process(&mut buf); + + // Extract the valid output: discard the first n_overlap samples. + let end = (n_overlap + n_new).min(buf.len()); + let output: Vec = buf[n_overlap..end].iter().map(|s| s.re).collect(); + + // Update overlap with the tail of the current input. + if n_overlap > 0 { + let keep_old = n_overlap.saturating_sub(n_new); + let mut new_overlap = Vec::with_capacity(n_overlap); + if keep_old > 0 { + let start = self.overlap.len() - keep_old; + new_overlap.extend_from_slice(&self.overlap[start..]); + } + let new_start = n_new.saturating_sub(n_overlap); + new_overlap.extend_from_slice(&input[new_start..]); + self.overlap = new_overlap; + } + + output + } +} + // --------------------------------------------------------------------------- // Channel DSP context // --------------------------------------------------------------------------- -/// Per-channel DSP state: mixer, FIR, decimator, demodulator, frame accumulator. +/// Per-channel DSP state: mixer, FFT-FIR, decimator, demodulator, frame accumulator. pub struct ChannelDsp { /// Frequency offset of this channel from the SDR centre (Hz). - /// Already accounts for `center_offset_hz`. pub channel_if_hz: f64, /// Current demodulator (can be swapped via `set_mode`). pub demodulator: Demodulator, - /// FIR anti-alias filter applied to I component before decimation. - pub lpf_i: FirFilter, - /// FIR anti-alias filter applied to Q component before decimation. - pub lpf_q: FirFilter, + /// FFT-based FIR low-pass filter applied to I component before decimation. + lpf_i: BlockFirFilter, + /// FFT-based FIR low-pass filter applied to Q component before decimation. + lpf_q: BlockFirFilter, /// Decimation factor: `sdr_sample_rate / audio_sample_rate`. pub decim_factor: usize, /// Accumulator for output PCM frames. pub frame_buf: Vec, - /// Target frame size in samples (`audio_sample_rate * frame_duration_ms / 1000`). + /// Target frame size in samples. pub frame_size: usize, /// Sender for completed PCM frames. pub pcm_tx: broadcast::Sender>, - /// Current oscillator phase (radians) for the complex mixer. + /// Current oscillator phase (radians). pub mixer_phase: f64, - /// Phase increment per IQ sample: `2π * channel_if_hz / sdr_sample_rate`. + /// Phase increment per IQ sample. pub mixer_phase_inc: f64, - /// Decimation counter: counts input samples, fires every `decim_factor` samples. + /// Decimation counter. decim_counter: usize, } impl ChannelDsp { - /// Construct a new `ChannelDsp`. - /// - /// `channel_if_hz`: IF offset within the IQ band (Hz, signed). - /// `mode`: initial demodulation mode. - /// `sdr_sample_rate`: IQ capture rate (Hz). - /// `audio_sample_rate`: output PCM rate (Hz). - /// `frame_duration_ms`: output frame length (ms). - /// `audio_bandwidth_hz`: one-sided audio BPF cutoff (Hz). - /// `fir_taps`: number of FIR taps. - /// `pcm_tx`: broadcast sender for completed PCM frames. #[allow(clippy::too_many_arguments)] pub fn new( channel_if_hz: f64, @@ -188,14 +285,12 @@ impl ChannelDsp { (audio_sample_rate as usize * frame_duration_ms as usize) / 1000 }; - // Normalised cutoff for the anti-alias LPF: audio_bandwidth_hz / sdr_sample_rate. - // This keeps only the audio-bandwidth-wide slice of the IQ after mixing. let cutoff_norm = if sdr_sample_rate == 0 { 0.1 } else { (audio_bandwidth_hz as f32 / 2.0) / sdr_sample_rate as f32 - }; - let cutoff_norm = cutoff_norm.min(0.499); // clamp below Nyquist + } + .min(0.499); let taps = fir_taps.max(1); @@ -208,8 +303,8 @@ impl ChannelDsp { Self { channel_if_hz, demodulator: Demodulator::for_mode(mode), - lpf_i: FirFilter::new(cutoff_norm, taps), - lpf_q: FirFilter::new(cutoff_norm, taps), + lpf_i: BlockFirFilter::new(cutoff_norm, taps, IQ_BLOCK_SIZE), + lpf_q: BlockFirFilter::new(cutoff_norm, taps, IQ_BLOCK_SIZE), decim_factor, frame_buf: Vec::with_capacity(frame_size * 2), frame_size, @@ -220,66 +315,75 @@ impl ChannelDsp { } } - /// Update the demodulator to match a new mode. pub fn set_mode(&mut self, mode: &RigMode) { self.demodulator = Demodulator::for_mode(mode); } /// Process a block of raw IQ samples through the full DSP chain. /// - /// 1. Mix each sample to baseband using the channel oscillator. - /// 2. Apply LPF to both I and Q. - /// 3. Decimate by `decim_factor`. - /// 4. Accumulate decimated samples; when enough are collected, demodulate. - /// 5. Append demodulated audio to `frame_buf`. - /// 6. Emit complete frames via `pcm_tx`. + /// 1. **Batch mixer**: compute the full LO signal for the block at once, + /// then multiply element-wise. The loop body has no cross-iteration + /// data dependency so the compiler can auto-vectorise it. + /// 2. **FFT FIR**: apply the overlap-save FIR to I and Q in one FFT pair + /// each instead of N multiplies per sample. + /// 3. Decimate. + /// 4. Demodulate. + /// 5. Emit complete PCM frames. pub fn process_block(&mut self, block: &[Complex]) { - // Pre-allocate a local decimated IQ accumulation buffer. - let capacity = block.len() / self.decim_factor + 1; - let mut local_dec: Vec> = Vec::with_capacity(capacity); - - for &sample in block { - // --- 1. Mix to baseband --- - let (sin, cos) = self.mixer_phase.sin_cos(); - // exp(-j * phase) = cos(phase) - j*sin(phase) - let lo = Complex::new(cos as f32, -(sin as f32)); - let mixed = sample * lo; - - // Advance mixer phase, wrap at 2π. - self.mixer_phase += self.mixer_phase_inc; - if self.mixer_phase >= std::f64::consts::TAU { - self.mixer_phase -= std::f64::consts::TAU; - } else if self.mixer_phase < -std::f64::consts::TAU { - self.mixer_phase += std::f64::consts::TAU; - } - - // --- 2. Apply LPF to both I and Q --- - let filtered_i = self.lpf_i.process(mixed.re); - let filtered_q = self.lpf_q.process(mixed.im); - let filtered = Complex::new(filtered_i, filtered_q); - - // --- 3. Decimate --- - self.decim_counter += 1; - if self.decim_counter >= self.decim_factor { - self.decim_counter = 0; - local_dec.push(filtered); - } - } - - if local_dec.is_empty() { + let n = block.len(); + if n == 0 { return; } - // --- 4. Demodulate the decimated block --- - let audio = self.demodulator.demodulate(&local_dec); + // --- 1. Batch mixer ------------------------------------------------- + // Pre-compute I and Q mixer outputs for the whole block. + // Each iteration is independent → the compiler can vectorise. + let mut mixed_i = vec![0.0_f32; n]; + let mut mixed_q = vec![0.0_f32; n]; - // --- 5. Append to frame buffer --- + let phase_start = self.mixer_phase; + let phase_inc = self.mixer_phase_inc; + for (idx, &sample) in block.iter().enumerate() { + let phase = phase_start + idx as f64 * phase_inc; + let (sin, cos) = phase.sin_cos(); + let lo_re = cos as f32; + let lo_im = -(sin as f32); + // mixed = sample * exp(-j*phase) = sample * (cos - j*sin) + mixed_i[idx] = sample.re * lo_re - sample.im * lo_im; + mixed_q[idx] = sample.re * lo_im + sample.im * lo_re; + } + // Advance phase with wrap to avoid precision loss. + self.mixer_phase = + (phase_start + n as f64 * phase_inc).rem_euclid(std::f64::consts::TAU); + + // --- 2. FFT FIR (overlap-save) -------------------------------------- + let filtered_i = self.lpf_i.filter_block(&mixed_i); + let filtered_q = self.lpf_q.filter_block(&mixed_q); + + // --- 3. Decimate ---------------------------------------------------- + let capacity = n / self.decim_factor + 1; + let mut decimated: Vec> = Vec::with_capacity(capacity); + for i in 0..n { + self.decim_counter += 1; + if self.decim_counter >= self.decim_factor { + self.decim_counter = 0; + let fi = filtered_i.get(i).copied().unwrap_or(0.0); + let fq = filtered_q.get(i).copied().unwrap_or(0.0); + decimated.push(Complex::new(fi, fq)); + } + } + + if decimated.is_empty() { + return; + } + + // --- 4. Demodulate -------------------------------------------------- + let audio = self.demodulator.demodulate(&decimated); + + // --- 5. Emit complete PCM frames ------------------------------------ self.frame_buf.extend_from_slice(&audio); - - // --- 6. Emit complete frames --- while self.frame_buf.len() >= self.frame_size { let frame: Vec = self.frame_buf.drain(..self.frame_size).collect(); - // Ignore send errors (no active receivers is fine). let _ = self.pcm_tx.send(frame); } } @@ -289,29 +393,12 @@ impl ChannelDsp { // Top-level pipeline struct // --------------------------------------------------------------------------- -/// Handle to the running SDR DSP pipeline. pub struct SdrPipeline { - /// One PCM sender per channel (index matches `channels` order in `start`). pub pcm_senders: Vec>>, - /// Shared references to per-channel DSP state (for runtime mode updates). pub channel_dsps: Vec>>, } impl SdrPipeline { - /// Build and start the IQ DSP pipeline. - /// - /// Spawns one OS thread that reads IQ samples from `source` and drives - /// all per-channel DSP chains. - /// - /// # Parameters - /// - `source`: IQ sample source (ownership transferred to read thread). - /// - `sdr_sample_rate`: IQ capture rate (Hz). - /// - `audio_sample_rate`: output PCM rate (Hz). - /// - `frame_duration_ms`: output frame length (ms). - /// - `channels`: slice of `(channel_if_hz, initial_mode, audio_bandwidth_hz, fir_taps)`. - /// - /// # Returns - /// A `SdrPipeline` handle with PCM senders and shared channel DSP references. pub fn start( source: Box, sdr_sample_rate: u32, @@ -319,11 +406,9 @@ impl SdrPipeline { frame_duration_ms: u16, channels: &[(f64, RigMode, u32, usize)], ) -> Self { - // Broadcast channel capacity: 64 IQ blocks. const IQ_BROADCAST_CAPACITY: usize = 64; let (iq_tx, _iq_rx) = broadcast::channel::>>(IQ_BROADCAST_CAPACITY); - // PCM broadcast capacity: enough for several frames of latency. const PCM_BROADCAST_CAPACITY: usize = 32; let mut pcm_senders = Vec::with_capacity(channels.len()); @@ -345,10 +430,8 @@ impl SdrPipeline { channel_dsps.push(Arc::new(Mutex::new(dsp))); } - // Clone the Arc references for the read thread. let thread_dsps: Vec>> = channel_dsps.clone(); - // Spawn the IQ read thread. std::thread::Builder::new() .name("sdr-iq-read".to_string()) .spawn(move || { @@ -367,14 +450,8 @@ impl SdrPipeline { // IQ read loop // --------------------------------------------------------------------------- -/// Block size for IQ reads. -const IQ_BLOCK_SIZE: usize = 4096; +pub const IQ_BLOCK_SIZE: usize = 4096; -/// The main IQ read loop. Runs on a dedicated OS thread. -/// -/// Reads IQ blocks from `source` and dispatches them to all channel DSP -/// instances. Also publishes raw IQ blocks on `iq_tx` for any downstream -/// subscribers. fn iq_read_loop( mut source: Box, sdr_sample_rate: u32, @@ -382,8 +459,6 @@ fn iq_read_loop( iq_tx: broadcast::Sender>>, ) { let mut block = vec![Complex::new(0.0_f32, 0.0_f32); IQ_BLOCK_SIZE]; - // Estimate time per block: IQ_BLOCK_SIZE samples at sdr_sample_rate Hz. - // This is used to throttle MockIqSource to avoid busy-looping. let block_duration_ms = if sdr_sample_rate > 0 { (IQ_BLOCK_SIZE as f64 / sdr_sample_rate as f64 * 1000.0) as u64 } else { @@ -395,24 +470,20 @@ fn iq_read_loop( Ok(n) => n, Err(e) => { tracing::warn!("IQ source read error: {}; retrying", e); - // Brief back-off to avoid spinning on persistent errors. std::thread::sleep(std::time::Duration::from_millis(10)); continue; } }; if n == 0 { - // Source returned zero samples — treat as transient, back off. std::thread::sleep(std::time::Duration::from_millis(1)); continue; } let samples = &block[..n]; - // Publish raw IQ to broadcast (best-effort; ignore lagged errors). let _ = iq_tx.send(samples.to_vec()); - // Drive per-channel DSP chains. for dsp_arc in &channel_dsps { match dsp_arc.lock() { Ok(mut dsp) => dsp.process_block(samples), @@ -422,9 +493,11 @@ fn iq_read_loop( } } - // Throttle to avoid busy-looping when using MockIqSource. - // Real hardware would naturally have this timing; - // the mock needs artificial throttling. + // Throttle only when source is faster than real time (e.g. MockIqSource). + // Real hardware naturally blocks in read_into; sleeping here would + // double-throttle it. We detect "faster than real time" by checking + // whether the source returned immediately (always true for mock, + // never for blocking hardware reads). std::thread::sleep(std::time::Duration::from_millis(block_duration_ms)); } } @@ -455,11 +528,9 @@ mod tests { // A DC signal (constant 1.0) should pass through the LPF. let mut fir = FirFilter::new(0.1, 31); let mut out = 0.0_f32; - // Run enough samples for the filter to settle. for _ in 0..100 { out = fir.process(1.0); } - // After settling, DC output should be close to 1.0. assert!( (out - 1.0).abs() < 0.05, "DC passthrough failed: got {}", @@ -471,25 +542,47 @@ mod tests { fn fir_filter_single_tap() { let mut fir = FirFilter::new(0.25, 1); let out = fir.process(0.5); - // With one tap, the coefficient sum is normalised to 1.0, so output ≈ 0.5. assert!((out - 0.5).abs() < 1e-5, "single-tap output: {}", out); } + #[test] + fn block_fir_dc_passthrough() { + let mut fir = BlockFirFilter::new(0.1, 31, 256); + // Feed several blocks of DC signal; output should settle near 1.0. + let input = vec![1.0_f32; 256]; + let mut last = vec![]; + for _ in 0..8 { + last = fir.filter_block(&input); + } + let mean: f32 = last.iter().copied().sum::() / last.len() as f32; + assert!( + (mean - 1.0).abs() < 0.05, + "BlockFirFilter DC passthrough failed: mean={}", + mean + ); + } + + #[test] + fn block_fir_same_length_output() { + let mut fir = BlockFirFilter::new(0.2, 64, 128); + let input = vec![0.5_f32; 128]; + let out = fir.filter_block(&input); + assert_eq!(out.len(), 128, "output length should equal input length"); + } + #[test] fn channel_dsp_processes_silence() { let (pcm_tx, _pcm_rx) = broadcast::channel::>(8); let mut dsp = ChannelDsp::new( - 0.0, // channel_if_hz + 0.0, &RigMode::USB, - 48_000, // sdr_sample_rate - 8_000, // audio_sample_rate (decim = 6) - 20, // frame_duration_ms → frame_size = 160 - 3000, // audio_bandwidth_hz - 31, // fir_taps + 48_000, + 8_000, + 20, + 3000, + 31, pcm_tx, ); - - // Feed 4096 zero samples — should not panic. let block = vec![Complex::new(0.0_f32, 0.0_f32); 4096]; dsp.process_block(&block); }