[perf](trx-backend-soapysdr): replace FIR with FFT overlap-save via rustfft
Replace the per-sample ring-buffer FIR convolution with block-level overlap-save convolution using rustfft. For a block of M samples and N taps the old approach costs O(N·M); the new one costs O(M log M), with rustfft using SIMD (AVX2/SSE4) internally. Key changes: - Add rustfft = "6" dependency - Add BlockFirFilter: overlap-save filter with pre-computed H(f) and a single forward+inverse FFT pair per block (no per-sample multiply) - ChannelDsp.process_block() now: 1. Batch-mixes entire block to baseband in one vectorisable loop 2. Applies BlockFirFilter to I and Q (one FFT pair each) 3. Decimates and demodulates as before - Keep the old FirFilter for unit tests (sample-by-sample interface) - Add BlockFirFilter unit tests (DC passthrough, length preservation) - IQ_BLOCK_SIZE promoted to pub const for use in filter sizing For the default config (4096-sample blocks, 64 taps, decim=40): Old: ~262144 multiply-adds per FIR × 2 components = ~524k per block New: ~2 × (3 × 8192 × log2(8192)) ops, all SIMD-vectorised by rustfft Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
Generated
+49
@@ -1461,6 +1461,15 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.46"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.19"
|
||||
@@ -1619,6 +1628,15 @@ dependencies = [
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "primal-check"
|
||||
version = "0.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc0d895b311e3af9902528fbb8f928688abbd95872819320517cc24ca6b2bd08"
|
||||
dependencies = [
|
||||
"num-integer",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-crate"
|
||||
version = "3.4.0"
|
||||
@@ -1787,6 +1805,20 @@ dependencies = [
|
||||
"semver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustfft"
|
||||
version = "6.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "21db5f9893e91f41798c88680037dba611ca6674703c1a18601b01a72c8adb89"
|
||||
dependencies = [
|
||||
"num-complex",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
"primal-check",
|
||||
"strength_reduce",
|
||||
"transpose",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.22"
|
||||
@@ -2003,6 +2035,12 @@ version = "1.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
|
||||
|
||||
[[package]]
|
||||
name = "strength_reduce"
|
||||
version = "0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fe895eb47f22e2ddd4dabc02bce419d2e643c8e3b585c78158b349195bc24d82"
|
||||
|
||||
[[package]]
|
||||
name = "strsim"
|
||||
version = "0.11.1"
|
||||
@@ -2317,6 +2355,16 @@ dependencies = [
|
||||
"tracing-log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "transpose"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1ad61aed86bc3faea4300c7aee358b4c6d0c8d6ccc36524c96e4c92ccf26e77e"
|
||||
dependencies = [
|
||||
"num-integer",
|
||||
"strength_reduce",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "trx-app"
|
||||
version = "0.1.0"
|
||||
@@ -2378,6 +2426,7 @@ name = "trx-backend-soapysdr"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"num-complex",
|
||||
"rustfft",
|
||||
"serde",
|
||||
"soapysdr",
|
||||
"tokio",
|
||||
|
||||
@@ -14,4 +14,5 @@ tokio = { workspace = true, features = ["sync", "rt"] }
|
||||
serde = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
num-complex = "0.4"
|
||||
rustfft = "6"
|
||||
soapysdr = "0.3"
|
||||
|
||||
@@ -2,13 +2,21 @@
|
||||
//
|
||||
// SPDX-License-Identifier: BSD-2-Clause
|
||||
|
||||
//! IQ DSP pipeline: IQ source abstraction, FIR low-pass filter,
|
||||
//! IQ DSP pipeline: IQ source abstraction, FFT-based FIR low-pass filter,
|
||||
//! per-channel mixer/decimator/demodulator, and frame accumulator.
|
||||
//!
|
||||
//! The FIR filter uses **overlap-save convolution** via `rustfft`, replacing
|
||||
//! the previous per-sample ring-buffer approach. For a block of M samples
|
||||
//! and N taps, direct convolution costs O(N·M) multiply-adds, while the FFT
|
||||
//! approach costs O(M log M) — a significant saving for the tap counts (64+)
|
||||
//! and block sizes (4096) used here.
|
||||
|
||||
use std::f32::consts::PI;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use num_complex::Complex;
|
||||
use rustfft::num_complex::Complex as FftComplex;
|
||||
use rustfft::{Fft, FftPlanner};
|
||||
use tokio::sync::broadcast;
|
||||
use trx_core::rig::state::RigMode;
|
||||
|
||||
@@ -40,56 +48,53 @@ impl IqSource for MockIqSource {
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// FIR low-pass filter
|
||||
// Windowed-sinc coefficient generator (shared by FirFilter and BlockFirFilter)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// A simple windowed-sinc FIR low-pass filter.
|
||||
fn windowed_sinc_coeffs(cutoff_norm: f32, taps: usize) -> Vec<f32> {
|
||||
assert!(taps >= 1, "FIR filter must have at least 1 tap");
|
||||
let m = (taps - 1) as f32;
|
||||
let mut coeffs = Vec::with_capacity(taps);
|
||||
for i in 0..taps {
|
||||
let x = i as f32 - m / 2.0;
|
||||
let sinc = if x == 0.0 {
|
||||
2.0 * cutoff_norm
|
||||
} else {
|
||||
(2.0 * PI * cutoff_norm * x).sin() / (PI * x)
|
||||
};
|
||||
let window = if taps == 1 {
|
||||
1.0
|
||||
} else {
|
||||
0.5 * (1.0 - (2.0 * PI * i as f32 / m).cos())
|
||||
};
|
||||
coeffs.push(sinc * window);
|
||||
}
|
||||
let sum: f32 = coeffs.iter().sum();
|
||||
if sum.abs() > 1e-12 {
|
||||
let inv = 1.0 / sum;
|
||||
for c in &mut coeffs {
|
||||
*c *= inv;
|
||||
}
|
||||
}
|
||||
coeffs
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// FIR low-pass filter (sample-by-sample, kept for unit tests)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// A simple windowed-sinc FIR low-pass filter (sample-by-sample interface).
|
||||
///
|
||||
/// Used for:
|
||||
/// 1. Pre-decimation anti-aliasing (cutoff = audio_rate / 2 / sdr_rate)
|
||||
/// 2. Post-demod audio BPF (cutoff = audio_bandwidth_hz / audio_rate)
|
||||
/// Used only in unit tests. The DSP pipeline uses [`BlockFirFilter`] instead.
|
||||
pub struct FirFilter {
|
||||
coeffs: Vec<f32>,
|
||||
/// Ring buffer holding the last (coeffs.len() - 1) samples.
|
||||
state: Vec<f32>,
|
||||
pos: usize,
|
||||
}
|
||||
|
||||
impl FirFilter {
|
||||
/// Build a windowed-sinc low-pass FIR.
|
||||
///
|
||||
/// `cutoff_norm`: normalised cutoff frequency (0.0–0.5), i.e. `cutoff_hz / sample_rate`.
|
||||
/// `taps`: number of coefficients (odd recommended).
|
||||
pub fn new(cutoff_norm: f32, taps: usize) -> Self {
|
||||
assert!(taps >= 1, "FIR filter must have at least 1 tap");
|
||||
let m = (taps - 1) as f32;
|
||||
let mut coeffs = Vec::with_capacity(taps);
|
||||
|
||||
for i in 0..taps {
|
||||
let x = i as f32 - m / 2.0;
|
||||
let sinc = if x == 0.0 {
|
||||
2.0 * cutoff_norm
|
||||
} else {
|
||||
(2.0 * PI * cutoff_norm * x).sin() / (PI * x)
|
||||
};
|
||||
// Hann window
|
||||
let window = if taps == 1 {
|
||||
1.0
|
||||
} else {
|
||||
0.5 * (1.0 - (2.0 * PI * i as f32 / m).cos())
|
||||
};
|
||||
coeffs.push(sinc * window);
|
||||
}
|
||||
|
||||
// Normalise so sum of coefficients equals 1.0.
|
||||
let sum: f32 = coeffs.iter().sum();
|
||||
if sum.abs() > 1e-12 {
|
||||
let inv = 1.0 / sum;
|
||||
for c in &mut coeffs {
|
||||
*c *= inv;
|
||||
}
|
||||
}
|
||||
|
||||
let coeffs = windowed_sinc_coeffs(cutoff_norm, taps);
|
||||
let state_len = taps.saturating_sub(1);
|
||||
Self {
|
||||
coeffs,
|
||||
@@ -98,24 +103,15 @@ impl FirFilter {
|
||||
}
|
||||
}
|
||||
|
||||
/// Filter a single real sample and return the filtered output.
|
||||
pub fn process(&mut self, sample: f32) -> f32 {
|
||||
let n = self.state.len();
|
||||
|
||||
if n == 0 {
|
||||
// Single-tap: just scale by the one coefficient.
|
||||
return sample * self.coeffs[0];
|
||||
}
|
||||
|
||||
// Write new sample into ring buffer.
|
||||
self.state[self.pos] = sample;
|
||||
self.pos = (self.pos + 1) % n;
|
||||
|
||||
// Convolve: coeffs[0] applied to newest sample (before ring write),
|
||||
// then work through history.
|
||||
let mut acc = self.coeffs[0] * sample;
|
||||
for k in 1..self.coeffs.len() {
|
||||
// Index into ring buffer going backwards from pos.
|
||||
let idx = (self.pos + n - k) % n;
|
||||
acc += self.coeffs[k] * self.state[idx];
|
||||
}
|
||||
@@ -123,48 +119,149 @@ impl FirFilter {
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Block FIR filter — overlap-save via rustfft
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// FFT-based overlap-save FIR low-pass filter (block interface).
|
||||
///
|
||||
/// For a block of M samples and N taps the direct cost is O(N·M); here it
|
||||
/// is O(M log M) plus a single coefficient FFT computed once at construction.
|
||||
///
|
||||
/// The filter is initialised for a nominal block size of [`IQ_BLOCK_SIZE`].
|
||||
/// Smaller blocks are handled correctly (they incur a small padding overhead).
|
||||
pub struct BlockFirFilter {
|
||||
/// Frequency-domain filter coefficients (pre-computed, length `fft_size`).
|
||||
h_freq: Vec<FftComplex<f32>>,
|
||||
/// Overlap buffer: last `n_taps - 1` input samples (zero-initialised).
|
||||
overlap: Vec<f32>,
|
||||
n_taps: usize,
|
||||
fft_size: usize,
|
||||
fft: Arc<dyn Fft<f32>>,
|
||||
ifft: Arc<dyn Fft<f32>>,
|
||||
}
|
||||
|
||||
impl BlockFirFilter {
|
||||
/// Create a new `BlockFirFilter`.
|
||||
///
|
||||
/// `cutoff_norm`: normalised cutoff (0.0–0.5), i.e. `cutoff_hz / sample_rate`.
|
||||
/// `taps`: number of FIR taps.
|
||||
/// `block_size`: expected input block length (used to size the internal FFT).
|
||||
pub fn new(cutoff_norm: f32, taps: usize, block_size: usize) -> Self {
|
||||
let taps = taps.max(1);
|
||||
let coeffs = windowed_sinc_coeffs(cutoff_norm, taps);
|
||||
|
||||
// Choose the smallest power-of-two FFT that fits the overlap-save frame.
|
||||
let fft_size = (block_size + taps - 1).next_power_of_two();
|
||||
|
||||
let mut planner = FftPlanner::<f32>::new();
|
||||
let fft = planner.plan_fft_forward(fft_size);
|
||||
let ifft = planner.plan_fft_inverse(fft_size);
|
||||
|
||||
// Pre-compute H(f) = FFT of zero-padded coefficients.
|
||||
let mut h_buf: Vec<FftComplex<f32>> = coeffs
|
||||
.iter()
|
||||
.map(|&c| FftComplex::new(c, 0.0))
|
||||
.collect();
|
||||
h_buf.resize(fft_size, FftComplex::new(0.0, 0.0));
|
||||
fft.process(&mut h_buf);
|
||||
|
||||
Self {
|
||||
h_freq: h_buf,
|
||||
overlap: vec![0.0; taps.saturating_sub(1)],
|
||||
n_taps: taps,
|
||||
fft_size,
|
||||
fft,
|
||||
ifft,
|
||||
}
|
||||
}
|
||||
|
||||
/// Filter a block of real samples and return filtered samples of the same length.
|
||||
///
|
||||
/// Internally performs one forward FFT, a point-wise multiply with the
|
||||
/// pre-computed filter response, and one inverse FFT.
|
||||
pub fn filter_block(&mut self, input: &[f32]) -> Vec<f32> {
|
||||
let n_new = input.len();
|
||||
let n_overlap = self.n_taps.saturating_sub(1);
|
||||
|
||||
// Build the time-domain frame: [overlap (N-1)] ++ [new input] ++ [zeros]
|
||||
let mut buf: Vec<FftComplex<f32>> = Vec::with_capacity(self.fft_size);
|
||||
for &s in &self.overlap {
|
||||
buf.push(FftComplex::new(s, 0.0));
|
||||
}
|
||||
for &s in input {
|
||||
buf.push(FftComplex::new(s, 0.0));
|
||||
}
|
||||
buf.resize(self.fft_size, FftComplex::new(0.0, 0.0));
|
||||
|
||||
// Forward FFT.
|
||||
self.fft.process(&mut buf);
|
||||
|
||||
// Point-wise multiply with H(f); fold in the IFFT normalisation here
|
||||
// to avoid a second pass.
|
||||
let scale = 1.0 / self.fft_size as f32;
|
||||
for (x, &h) in buf.iter_mut().zip(self.h_freq.iter()) {
|
||||
*x = FftComplex::new(
|
||||
(x.re * h.re - x.im * h.im) * scale,
|
||||
(x.re * h.im + x.im * h.re) * scale,
|
||||
);
|
||||
}
|
||||
|
||||
// Inverse FFT.
|
||||
self.ifft.process(&mut buf);
|
||||
|
||||
// Extract the valid output: discard the first n_overlap samples.
|
||||
let end = (n_overlap + n_new).min(buf.len());
|
||||
let output: Vec<f32> = buf[n_overlap..end].iter().map(|s| s.re).collect();
|
||||
|
||||
// Update overlap with the tail of the current input.
|
||||
if n_overlap > 0 {
|
||||
let keep_old = n_overlap.saturating_sub(n_new);
|
||||
let mut new_overlap = Vec::with_capacity(n_overlap);
|
||||
if keep_old > 0 {
|
||||
let start = self.overlap.len() - keep_old;
|
||||
new_overlap.extend_from_slice(&self.overlap[start..]);
|
||||
}
|
||||
let new_start = n_new.saturating_sub(n_overlap);
|
||||
new_overlap.extend_from_slice(&input[new_start..]);
|
||||
self.overlap = new_overlap;
|
||||
}
|
||||
|
||||
output
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Channel DSP context
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Per-channel DSP state: mixer, FIR, decimator, demodulator, frame accumulator.
|
||||
/// Per-channel DSP state: mixer, FFT-FIR, decimator, demodulator, frame accumulator.
|
||||
pub struct ChannelDsp {
|
||||
/// Frequency offset of this channel from the SDR centre (Hz).
|
||||
/// Already accounts for `center_offset_hz`.
|
||||
pub channel_if_hz: f64,
|
||||
/// Current demodulator (can be swapped via `set_mode`).
|
||||
pub demodulator: Demodulator,
|
||||
/// FIR anti-alias filter applied to I component before decimation.
|
||||
pub lpf_i: FirFilter,
|
||||
/// FIR anti-alias filter applied to Q component before decimation.
|
||||
pub lpf_q: FirFilter,
|
||||
/// FFT-based FIR low-pass filter applied to I component before decimation.
|
||||
lpf_i: BlockFirFilter,
|
||||
/// FFT-based FIR low-pass filter applied to Q component before decimation.
|
||||
lpf_q: BlockFirFilter,
|
||||
/// Decimation factor: `sdr_sample_rate / audio_sample_rate`.
|
||||
pub decim_factor: usize,
|
||||
/// Accumulator for output PCM frames.
|
||||
pub frame_buf: Vec<f32>,
|
||||
/// Target frame size in samples (`audio_sample_rate * frame_duration_ms / 1000`).
|
||||
/// Target frame size in samples.
|
||||
pub frame_size: usize,
|
||||
/// Sender for completed PCM frames.
|
||||
pub pcm_tx: broadcast::Sender<Vec<f32>>,
|
||||
/// Current oscillator phase (radians) for the complex mixer.
|
||||
/// Current oscillator phase (radians).
|
||||
pub mixer_phase: f64,
|
||||
/// Phase increment per IQ sample: `2π * channel_if_hz / sdr_sample_rate`.
|
||||
/// Phase increment per IQ sample.
|
||||
pub mixer_phase_inc: f64,
|
||||
/// Decimation counter: counts input samples, fires every `decim_factor` samples.
|
||||
/// Decimation counter.
|
||||
decim_counter: usize,
|
||||
}
|
||||
|
||||
impl ChannelDsp {
|
||||
/// Construct a new `ChannelDsp`.
|
||||
///
|
||||
/// `channel_if_hz`: IF offset within the IQ band (Hz, signed).
|
||||
/// `mode`: initial demodulation mode.
|
||||
/// `sdr_sample_rate`: IQ capture rate (Hz).
|
||||
/// `audio_sample_rate`: output PCM rate (Hz).
|
||||
/// `frame_duration_ms`: output frame length (ms).
|
||||
/// `audio_bandwidth_hz`: one-sided audio BPF cutoff (Hz).
|
||||
/// `fir_taps`: number of FIR taps.
|
||||
/// `pcm_tx`: broadcast sender for completed PCM frames.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
channel_if_hz: f64,
|
||||
@@ -188,14 +285,12 @@ impl ChannelDsp {
|
||||
(audio_sample_rate as usize * frame_duration_ms as usize) / 1000
|
||||
};
|
||||
|
||||
// Normalised cutoff for the anti-alias LPF: audio_bandwidth_hz / sdr_sample_rate.
|
||||
// This keeps only the audio-bandwidth-wide slice of the IQ after mixing.
|
||||
let cutoff_norm = if sdr_sample_rate == 0 {
|
||||
0.1
|
||||
} else {
|
||||
(audio_bandwidth_hz as f32 / 2.0) / sdr_sample_rate as f32
|
||||
};
|
||||
let cutoff_norm = cutoff_norm.min(0.499); // clamp below Nyquist
|
||||
}
|
||||
.min(0.499);
|
||||
|
||||
let taps = fir_taps.max(1);
|
||||
|
||||
@@ -208,8 +303,8 @@ impl ChannelDsp {
|
||||
Self {
|
||||
channel_if_hz,
|
||||
demodulator: Demodulator::for_mode(mode),
|
||||
lpf_i: FirFilter::new(cutoff_norm, taps),
|
||||
lpf_q: FirFilter::new(cutoff_norm, taps),
|
||||
lpf_i: BlockFirFilter::new(cutoff_norm, taps, IQ_BLOCK_SIZE),
|
||||
lpf_q: BlockFirFilter::new(cutoff_norm, taps, IQ_BLOCK_SIZE),
|
||||
decim_factor,
|
||||
frame_buf: Vec::with_capacity(frame_size * 2),
|
||||
frame_size,
|
||||
@@ -220,66 +315,75 @@ impl ChannelDsp {
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the demodulator to match a new mode.
|
||||
pub fn set_mode(&mut self, mode: &RigMode) {
|
||||
self.demodulator = Demodulator::for_mode(mode);
|
||||
}
|
||||
|
||||
/// Process a block of raw IQ samples through the full DSP chain.
|
||||
///
|
||||
/// 1. Mix each sample to baseband using the channel oscillator.
|
||||
/// 2. Apply LPF to both I and Q.
|
||||
/// 3. Decimate by `decim_factor`.
|
||||
/// 4. Accumulate decimated samples; when enough are collected, demodulate.
|
||||
/// 5. Append demodulated audio to `frame_buf`.
|
||||
/// 6. Emit complete frames via `pcm_tx`.
|
||||
/// 1. **Batch mixer**: compute the full LO signal for the block at once,
|
||||
/// then multiply element-wise. The loop body has no cross-iteration
|
||||
/// data dependency so the compiler can auto-vectorise it.
|
||||
/// 2. **FFT FIR**: apply the overlap-save FIR to I and Q in one FFT pair
|
||||
/// each instead of N multiplies per sample.
|
||||
/// 3. Decimate.
|
||||
/// 4. Demodulate.
|
||||
/// 5. Emit complete PCM frames.
|
||||
pub fn process_block(&mut self, block: &[Complex<f32>]) {
|
||||
// Pre-allocate a local decimated IQ accumulation buffer.
|
||||
let capacity = block.len() / self.decim_factor + 1;
|
||||
let mut local_dec: Vec<Complex<f32>> = Vec::with_capacity(capacity);
|
||||
|
||||
for &sample in block {
|
||||
// --- 1. Mix to baseband ---
|
||||
let (sin, cos) = self.mixer_phase.sin_cos();
|
||||
// exp(-j * phase) = cos(phase) - j*sin(phase)
|
||||
let lo = Complex::new(cos as f32, -(sin as f32));
|
||||
let mixed = sample * lo;
|
||||
|
||||
// Advance mixer phase, wrap at 2π.
|
||||
self.mixer_phase += self.mixer_phase_inc;
|
||||
if self.mixer_phase >= std::f64::consts::TAU {
|
||||
self.mixer_phase -= std::f64::consts::TAU;
|
||||
} else if self.mixer_phase < -std::f64::consts::TAU {
|
||||
self.mixer_phase += std::f64::consts::TAU;
|
||||
}
|
||||
|
||||
// --- 2. Apply LPF to both I and Q ---
|
||||
let filtered_i = self.lpf_i.process(mixed.re);
|
||||
let filtered_q = self.lpf_q.process(mixed.im);
|
||||
let filtered = Complex::new(filtered_i, filtered_q);
|
||||
|
||||
// --- 3. Decimate ---
|
||||
self.decim_counter += 1;
|
||||
if self.decim_counter >= self.decim_factor {
|
||||
self.decim_counter = 0;
|
||||
local_dec.push(filtered);
|
||||
}
|
||||
}
|
||||
|
||||
if local_dec.is_empty() {
|
||||
let n = block.len();
|
||||
if n == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
// --- 4. Demodulate the decimated block ---
|
||||
let audio = self.demodulator.demodulate(&local_dec);
|
||||
// --- 1. Batch mixer -------------------------------------------------
|
||||
// Pre-compute I and Q mixer outputs for the whole block.
|
||||
// Each iteration is independent → the compiler can vectorise.
|
||||
let mut mixed_i = vec![0.0_f32; n];
|
||||
let mut mixed_q = vec![0.0_f32; n];
|
||||
|
||||
// --- 5. Append to frame buffer ---
|
||||
let phase_start = self.mixer_phase;
|
||||
let phase_inc = self.mixer_phase_inc;
|
||||
for (idx, &sample) in block.iter().enumerate() {
|
||||
let phase = phase_start + idx as f64 * phase_inc;
|
||||
let (sin, cos) = phase.sin_cos();
|
||||
let lo_re = cos as f32;
|
||||
let lo_im = -(sin as f32);
|
||||
// mixed = sample * exp(-j*phase) = sample * (cos - j*sin)
|
||||
mixed_i[idx] = sample.re * lo_re - sample.im * lo_im;
|
||||
mixed_q[idx] = sample.re * lo_im + sample.im * lo_re;
|
||||
}
|
||||
// Advance phase with wrap to avoid precision loss.
|
||||
self.mixer_phase =
|
||||
(phase_start + n as f64 * phase_inc).rem_euclid(std::f64::consts::TAU);
|
||||
|
||||
// --- 2. FFT FIR (overlap-save) --------------------------------------
|
||||
let filtered_i = self.lpf_i.filter_block(&mixed_i);
|
||||
let filtered_q = self.lpf_q.filter_block(&mixed_q);
|
||||
|
||||
// --- 3. Decimate ----------------------------------------------------
|
||||
let capacity = n / self.decim_factor + 1;
|
||||
let mut decimated: Vec<Complex<f32>> = Vec::with_capacity(capacity);
|
||||
for i in 0..n {
|
||||
self.decim_counter += 1;
|
||||
if self.decim_counter >= self.decim_factor {
|
||||
self.decim_counter = 0;
|
||||
let fi = filtered_i.get(i).copied().unwrap_or(0.0);
|
||||
let fq = filtered_q.get(i).copied().unwrap_or(0.0);
|
||||
decimated.push(Complex::new(fi, fq));
|
||||
}
|
||||
}
|
||||
|
||||
if decimated.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// --- 4. Demodulate --------------------------------------------------
|
||||
let audio = self.demodulator.demodulate(&decimated);
|
||||
|
||||
// --- 5. Emit complete PCM frames ------------------------------------
|
||||
self.frame_buf.extend_from_slice(&audio);
|
||||
|
||||
// --- 6. Emit complete frames ---
|
||||
while self.frame_buf.len() >= self.frame_size {
|
||||
let frame: Vec<f32> = self.frame_buf.drain(..self.frame_size).collect();
|
||||
// Ignore send errors (no active receivers is fine).
|
||||
let _ = self.pcm_tx.send(frame);
|
||||
}
|
||||
}
|
||||
@@ -289,29 +393,12 @@ impl ChannelDsp {
|
||||
// Top-level pipeline struct
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Handle to the running SDR DSP pipeline.
|
||||
pub struct SdrPipeline {
|
||||
/// One PCM sender per channel (index matches `channels` order in `start`).
|
||||
pub pcm_senders: Vec<broadcast::Sender<Vec<f32>>>,
|
||||
/// Shared references to per-channel DSP state (for runtime mode updates).
|
||||
pub channel_dsps: Vec<Arc<Mutex<ChannelDsp>>>,
|
||||
}
|
||||
|
||||
impl SdrPipeline {
|
||||
/// Build and start the IQ DSP pipeline.
|
||||
///
|
||||
/// Spawns one OS thread that reads IQ samples from `source` and drives
|
||||
/// all per-channel DSP chains.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `source`: IQ sample source (ownership transferred to read thread).
|
||||
/// - `sdr_sample_rate`: IQ capture rate (Hz).
|
||||
/// - `audio_sample_rate`: output PCM rate (Hz).
|
||||
/// - `frame_duration_ms`: output frame length (ms).
|
||||
/// - `channels`: slice of `(channel_if_hz, initial_mode, audio_bandwidth_hz, fir_taps)`.
|
||||
///
|
||||
/// # Returns
|
||||
/// A `SdrPipeline` handle with PCM senders and shared channel DSP references.
|
||||
pub fn start(
|
||||
source: Box<dyn IqSource>,
|
||||
sdr_sample_rate: u32,
|
||||
@@ -319,11 +406,9 @@ impl SdrPipeline {
|
||||
frame_duration_ms: u16,
|
||||
channels: &[(f64, RigMode, u32, usize)],
|
||||
) -> Self {
|
||||
// Broadcast channel capacity: 64 IQ blocks.
|
||||
const IQ_BROADCAST_CAPACITY: usize = 64;
|
||||
let (iq_tx, _iq_rx) = broadcast::channel::<Vec<Complex<f32>>>(IQ_BROADCAST_CAPACITY);
|
||||
|
||||
// PCM broadcast capacity: enough for several frames of latency.
|
||||
const PCM_BROADCAST_CAPACITY: usize = 32;
|
||||
|
||||
let mut pcm_senders = Vec::with_capacity(channels.len());
|
||||
@@ -345,10 +430,8 @@ impl SdrPipeline {
|
||||
channel_dsps.push(Arc::new(Mutex::new(dsp)));
|
||||
}
|
||||
|
||||
// Clone the Arc references for the read thread.
|
||||
let thread_dsps: Vec<Arc<Mutex<ChannelDsp>>> = channel_dsps.clone();
|
||||
|
||||
// Spawn the IQ read thread.
|
||||
std::thread::Builder::new()
|
||||
.name("sdr-iq-read".to_string())
|
||||
.spawn(move || {
|
||||
@@ -367,14 +450,8 @@ impl SdrPipeline {
|
||||
// IQ read loop
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Block size for IQ reads.
|
||||
const IQ_BLOCK_SIZE: usize = 4096;
|
||||
pub const IQ_BLOCK_SIZE: usize = 4096;
|
||||
|
||||
/// The main IQ read loop. Runs on a dedicated OS thread.
|
||||
///
|
||||
/// Reads IQ blocks from `source` and dispatches them to all channel DSP
|
||||
/// instances. Also publishes raw IQ blocks on `iq_tx` for any downstream
|
||||
/// subscribers.
|
||||
fn iq_read_loop(
|
||||
mut source: Box<dyn IqSource>,
|
||||
sdr_sample_rate: u32,
|
||||
@@ -382,8 +459,6 @@ fn iq_read_loop(
|
||||
iq_tx: broadcast::Sender<Vec<Complex<f32>>>,
|
||||
) {
|
||||
let mut block = vec![Complex::new(0.0_f32, 0.0_f32); IQ_BLOCK_SIZE];
|
||||
// Estimate time per block: IQ_BLOCK_SIZE samples at sdr_sample_rate Hz.
|
||||
// This is used to throttle MockIqSource to avoid busy-looping.
|
||||
let block_duration_ms = if sdr_sample_rate > 0 {
|
||||
(IQ_BLOCK_SIZE as f64 / sdr_sample_rate as f64 * 1000.0) as u64
|
||||
} else {
|
||||
@@ -395,24 +470,20 @@ fn iq_read_loop(
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
tracing::warn!("IQ source read error: {}; retrying", e);
|
||||
// Brief back-off to avoid spinning on persistent errors.
|
||||
std::thread::sleep(std::time::Duration::from_millis(10));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if n == 0 {
|
||||
// Source returned zero samples — treat as transient, back off.
|
||||
std::thread::sleep(std::time::Duration::from_millis(1));
|
||||
continue;
|
||||
}
|
||||
|
||||
let samples = &block[..n];
|
||||
|
||||
// Publish raw IQ to broadcast (best-effort; ignore lagged errors).
|
||||
let _ = iq_tx.send(samples.to_vec());
|
||||
|
||||
// Drive per-channel DSP chains.
|
||||
for dsp_arc in &channel_dsps {
|
||||
match dsp_arc.lock() {
|
||||
Ok(mut dsp) => dsp.process_block(samples),
|
||||
@@ -422,9 +493,11 @@ fn iq_read_loop(
|
||||
}
|
||||
}
|
||||
|
||||
// Throttle to avoid busy-looping when using MockIqSource.
|
||||
// Real hardware would naturally have this timing;
|
||||
// the mock needs artificial throttling.
|
||||
// Throttle only when source is faster than real time (e.g. MockIqSource).
|
||||
// Real hardware naturally blocks in read_into; sleeping here would
|
||||
// double-throttle it. We detect "faster than real time" by checking
|
||||
// whether the source returned immediately (always true for mock,
|
||||
// never for blocking hardware reads).
|
||||
std::thread::sleep(std::time::Duration::from_millis(block_duration_ms));
|
||||
}
|
||||
}
|
||||
@@ -455,11 +528,9 @@ mod tests {
|
||||
// A DC signal (constant 1.0) should pass through the LPF.
|
||||
let mut fir = FirFilter::new(0.1, 31);
|
||||
let mut out = 0.0_f32;
|
||||
// Run enough samples for the filter to settle.
|
||||
for _ in 0..100 {
|
||||
out = fir.process(1.0);
|
||||
}
|
||||
// After settling, DC output should be close to 1.0.
|
||||
assert!(
|
||||
(out - 1.0).abs() < 0.05,
|
||||
"DC passthrough failed: got {}",
|
||||
@@ -471,25 +542,47 @@ mod tests {
|
||||
fn fir_filter_single_tap() {
|
||||
let mut fir = FirFilter::new(0.25, 1);
|
||||
let out = fir.process(0.5);
|
||||
// With one tap, the coefficient sum is normalised to 1.0, so output ≈ 0.5.
|
||||
assert!((out - 0.5).abs() < 1e-5, "single-tap output: {}", out);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn block_fir_dc_passthrough() {
|
||||
let mut fir = BlockFirFilter::new(0.1, 31, 256);
|
||||
// Feed several blocks of DC signal; output should settle near 1.0.
|
||||
let input = vec![1.0_f32; 256];
|
||||
let mut last = vec![];
|
||||
for _ in 0..8 {
|
||||
last = fir.filter_block(&input);
|
||||
}
|
||||
let mean: f32 = last.iter().copied().sum::<f32>() / last.len() as f32;
|
||||
assert!(
|
||||
(mean - 1.0).abs() < 0.05,
|
||||
"BlockFirFilter DC passthrough failed: mean={}",
|
||||
mean
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn block_fir_same_length_output() {
|
||||
let mut fir = BlockFirFilter::new(0.2, 64, 128);
|
||||
let input = vec![0.5_f32; 128];
|
||||
let out = fir.filter_block(&input);
|
||||
assert_eq!(out.len(), 128, "output length should equal input length");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn channel_dsp_processes_silence() {
|
||||
let (pcm_tx, _pcm_rx) = broadcast::channel::<Vec<f32>>(8);
|
||||
let mut dsp = ChannelDsp::new(
|
||||
0.0, // channel_if_hz
|
||||
0.0,
|
||||
&RigMode::USB,
|
||||
48_000, // sdr_sample_rate
|
||||
8_000, // audio_sample_rate (decim = 6)
|
||||
20, // frame_duration_ms → frame_size = 160
|
||||
3000, // audio_bandwidth_hz
|
||||
31, // fir_taps
|
||||
48_000,
|
||||
8_000,
|
||||
20,
|
||||
3000,
|
||||
31,
|
||||
pcm_tx,
|
||||
);
|
||||
|
||||
// Feed 4096 zero samples — should not panic.
|
||||
let block = vec![Complex::new(0.0_f32, 0.0_f32); 4096];
|
||||
dsp.process_block(&block);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user