[refactor](workspace): split soapysdr demod and dsp modules

Split the SoapySDR backend demod and dsp code into focused modules while keeping behavior stable, and include the resulting formatting updates.

Co-authored-by: OpenAI Codex <codex@openai.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-01 13:55:48 +01:00
parent 441cdd3adb
commit 08b8c80cc3
17 changed files with 2438 additions and 2697 deletions
+20 -5
View File
@@ -239,7 +239,13 @@ impl Candidate {
self.locked = false;
self.search_bits = 0;
self.search_reg = 0;
self.process_group(self.block_a, self.block_b, self.block_c, self.block_c_kind, data)
self.process_group(
self.block_a,
self.block_b,
self.block_c,
self.block_c_kind,
data,
)
}
(_, BlockKind::A) => {
self.locked = true;
@@ -355,7 +361,9 @@ impl Candidate {
self.ps_bytes[segment * 2 + 1] = sanitize_text_byte(b1);
self.ps_seen[segment] = true;
if self.ps_seen.iter().all(|seen| *seen) {
let ps = String::from_utf8_lossy(&self.ps_bytes).trim_end().to_string();
let ps = String::from_utf8_lossy(&self.ps_bytes)
.trim_end()
.to_string();
if !ps.is_empty() && self.state.program_service.as_deref() != Some(ps.as_str()) {
self.state.program_service = Some(ps);
changed = true;
@@ -395,7 +403,9 @@ impl Candidate {
} else {
(last_seen + 1) * 4
};
let rt = String::from_utf8_lossy(&self.rt_bytes[..rt_len]).trim_end().to_string();
let rt = String::from_utf8_lossy(&self.rt_bytes[..rt_len])
.trim_end()
.to_string();
if !rt.is_empty() && self.state.radio_text.as_deref() != Some(rt.as_str()) {
self.state.radio_text = Some(rt);
changed = true;
@@ -414,7 +424,9 @@ impl Candidate {
self.ptyn_seen[segment] = true;
}
if self.ptyn_seen.iter().all(|seen| *seen) {
let ptyn = String::from_utf8_lossy(&self.ptyn_bytes).trim_end().to_string();
let ptyn = String::from_utf8_lossy(&self.ptyn_bytes)
.trim_end()
.to_string();
if !ptyn.is_empty()
&& self.state.program_type_name_long.as_deref() != Some(ptyn.as_str())
{
@@ -514,7 +526,10 @@ impl RdsDecoder {
if candidate.score >= self.best_score {
self.best_score = candidate.score;
let same_pi = self.best_state.as_ref().and_then(|state| state.pi) == update.pi;
if publish_quality >= MIN_PUBLISH_QUALITY || same_pi || self.best_state.is_none() {
if publish_quality >= MIN_PUBLISH_QUALITY
|| same_pi
|| self.best_state.is_none()
{
self.best_state = Some(update);
}
}
+3 -1
View File
@@ -337,7 +337,9 @@ impl ClientConfig {
}
if let Some(rig_id) = &self.frontends.http.default_rig_id {
if rig_id.trim().is_empty() {
return Err("[frontends.http].default_rig_id must not be empty when set".to_string());
return Err(
"[frontends.http].default_rig_id must not be empty when set".to_string()
);
}
}
if self.frontends.rigctl.enabled && self.frontends.rigctl.rig_ports.is_empty() {
+1 -2
View File
@@ -353,8 +353,7 @@ async fn async_init() -> DynResult<AppState> {
first = false;
}
// Proxy channel: inject rig_id_override before forwarding to main tx.
let (proxy_tx, mut proxy_rx) =
mpsc::channel::<RigRequest>(RIG_TASK_CHANNEL_BUFFER);
let (proxy_tx, mut proxy_rx) = mpsc::channel::<RigRequest>(RIG_TASK_CHANNEL_BUFFER);
let main_tx = tx.clone();
let rig_id_owned = rig_id.clone();
tokio::spawn(async move {
+21 -7
View File
@@ -250,16 +250,25 @@ async fn send_command_no_state_update(
writer.write_all(format!("{}\n", payload).as_bytes()),
)
.await
.map_err(|_| RigError::communication(format!("write timed out after {:?}", SPECTRUM_IO_TIMEOUT)))?
.map_err(|_| {
RigError::communication(format!("write timed out after {:?}", SPECTRUM_IO_TIMEOUT))
})?
.map_err(|e| RigError::communication(format!("write failed: {e}")))?;
time::timeout(SPECTRUM_IO_TIMEOUT, writer.flush())
.await
.map_err(|_| RigError::communication(format!("flush timed out after {:?}", SPECTRUM_IO_TIMEOUT)))?
.map_err(|_| {
RigError::communication(format!("flush timed out after {:?}", SPECTRUM_IO_TIMEOUT))
})?
.map_err(|e| RigError::communication(format!("flush failed: {e}")))?;
let line = time::timeout(SPECTRUM_IO_TIMEOUT, read_limited_line(reader, MAX_JSON_LINE_BYTES))
.await
.map_err(|_| RigError::communication(format!("read timed out after {:?}", SPECTRUM_IO_TIMEOUT)))?
.map_err(|e| RigError::communication(format!("read failed: {e}")))?;
let line = time::timeout(
SPECTRUM_IO_TIMEOUT,
read_limited_line(reader, MAX_JSON_LINE_BYTES),
)
.await
.map_err(|_| {
RigError::communication(format!("read timed out after {:?}", SPECTRUM_IO_TIMEOUT))
})?
.map_err(|e| RigError::communication(format!("read failed: {e}")))?;
let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?;
let resp: ClientResponse = serde_json::from_str(line.trim_end())
.map_err(|e| RigError::communication(format!("invalid response: {e}")))?;
@@ -386,7 +395,12 @@ fn should_poll_spectrum(config: &RemoteClientConfig) -> bool {
.known_rigs
.lock()
.ok()
.and_then(|entries| entries.iter().find(|entry| entry.rig_id == selected).cloned())
.and_then(|entries| {
entries
.iter()
.find(|entry| entry.rig_id == selected)
.cloned()
})
.map(|entry| entry.state.initialized)
.unwrap_or(true)
}
@@ -89,7 +89,10 @@ fn inject_frontend_meta(json: &str, meta: FrontendMeta) -> String {
serde_json::to_string(&value).unwrap_or_else(|_| json.to_string())
}
fn frontend_meta_from_context(http_clients: usize, context: &FrontendRuntimeContext) -> FrontendMeta {
fn frontend_meta_from_context(
http_clients: usize,
context: &FrontendRuntimeContext,
) -> FrontendMeta {
FrontendMeta {
http_clients,
rigctl_clients: context.rigctl_clients.load(Ordering::Relaxed),
@@ -314,11 +317,7 @@ pub async fn spectrum(
IntervalStream::new(time::interval(Duration::from_millis(40))).filter_map(move |_| {
let context = context_updates.clone();
std::future::ready({
let next = context
.spectrum
.lock()
.ok()
.map(|g| g.snapshot());
let next = context.spectrum.lock().ok().map(|g| g.snapshot());
let payload = match next {
Some((revision, _frame)) if last_revision == Some(revision) => None,
+5 -5
View File
@@ -820,11 +820,11 @@ async fn main() -> DynResult<()> {
#[cfg(feature = "soapysdr")]
let (sdr_prebuilt_rig, sdr_pcm_rx): (OptionalSdrRig, OptionalSdrPcmRx) =
if rig_cfg.rig.access.access_type.as_deref() == Some("sdr") {
let (rig, pcm_rx) = build_sdr_rig_from_instance(rig_cfg)?;
(Some(rig), Some(pcm_rx))
} else {
(None, None)
};
let (rig, pcm_rx) = build_sdr_rig_from_instance(rig_cfg)?;
(Some(rig), Some(pcm_rx))
} else {
(None, None)
};
#[cfg(not(feature = "soapysdr"))]
let (sdr_prebuilt_rig, sdr_pcm_rx): (OptionalSdrRig, OptionalSdrPcmRx) = (None, None);
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,56 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
use num_complex::Complex;
/// AM envelope detector: magnitude of IQ.
pub(super) fn demod_am(samples: &[Complex<f32>]) -> Vec<f32> {
samples
.iter()
.map(|sample| (sample.re * sample.re + sample.im * sample.im).sqrt())
.collect()
}
#[cfg(test)]
mod tests {
use super::demod_am;
use num_complex::Complex;
fn assert_approx_eq(a: f32, b: f32, tol: f32, label: &str) {
assert!(
(a - b).abs() <= tol,
"{}: expected {} ≈ {} (tol {})",
label,
a,
b,
tol
);
}
#[test]
fn test_am_raw_magnitude_constant() {
let input: Vec<Complex<f32>> = (0..8).map(|_| Complex::new(1.0, 0.0)).collect();
let out = demod_am(&input);
assert_eq!(out.len(), 8);
for (idx, &value) in out.iter().enumerate() {
assert_approx_eq(value, 1.0, 1e-6, &format!("AM sample {idx}"));
}
}
#[test]
fn test_am_raw_magnitude_varying() {
let input = vec![
Complex::new(0.0_f32, 0.0),
Complex::new(1.0, 0.0),
Complex::new(0.0, 0.0),
Complex::new(1.0, 0.0),
];
let expected = [0.0_f32, 1.0, 0.0, 1.0];
let out = demod_am(&input);
assert_eq!(out.len(), 4);
for (idx, (&got, &exp)) in out.iter().zip(expected.iter()).enumerate() {
assert_approx_eq(got, exp, 1e-6, &format!("AM sample {idx}"));
}
}
}
@@ -0,0 +1,59 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
use num_complex::Complex;
use super::math::demod_fm_with_prev;
/// FM quadrature discriminator: instantaneous frequency via `arg(s[n] * conj(s[n-1]))`.
pub(super) fn demod_fm(samples: &[Complex<f32>]) -> Vec<f32> {
let mut prev = None;
demod_fm_with_prev(samples, &mut prev)
}
#[cfg(test)]
mod tests {
use super::demod_fm;
use num_complex::Complex;
fn complex_tone(freq_norm: f32, len: usize) -> Vec<Complex<f32>> {
use std::f32::consts::TAU;
(0..len)
.map(|n| Complex::from_polar(1.0, TAU * freq_norm * n as f32))
.collect()
}
fn assert_approx_eq(a: f32, b: f32, tol: f32, label: &str) {
assert!(
(a - b).abs() <= tol,
"{}: expected {} ≈ {} (tol {})",
label,
a,
b,
tol
);
}
#[test]
fn test_fm_tone_frequency() {
let input = complex_tone(0.25, 16);
let out = demod_fm(&input);
assert_eq!(out.len(), 16);
assert_approx_eq(out[0], 0.0, 1e-6, "FM tone sample 0");
for (idx, &sample) in out.iter().enumerate().skip(1) {
assert_approx_eq(sample, 0.5, 0.01, &format!("FM tone sample {idx}"));
}
}
#[test]
fn test_fm_silence_is_zero() {
let input: Vec<Complex<f32>> = (0..8).map(|_| Complex::new(1.0, 0.0)).collect();
let out = demod_fm(&input);
assert_eq!(out.len(), 8);
for (idx, &value) in out.iter().enumerate() {
assert_approx_eq(value, 0.0, 1e-6, &format!("FM silence sample {idx}"));
}
}
}
@@ -0,0 +1,87 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
use num_complex::Complex;
#[inline]
pub(super) fn fast_atan2(y: f32, x: f32) -> f32 {
if x == 0.0 {
if y > 0.0 {
return std::f32::consts::FRAC_PI_2;
}
if y < 0.0 {
return -std::f32::consts::FRAC_PI_2;
}
return 0.0;
}
#[inline]
fn fast_atan(z: f32) -> f32 {
let abs_z = z.abs();
if abs_z <= 1.0 {
z * (std::f32::consts::FRAC_PI_4 + 0.273 * (1.0 - abs_z))
} else {
let inv = 1.0 / z;
let base = inv * (std::f32::consts::FRAC_PI_4 + 0.273 * (1.0 - inv.abs()));
if z > 0.0 {
std::f32::consts::FRAC_PI_2 - base
} else {
-std::f32::consts::FRAC_PI_2 - base
}
}
}
if x > 0.0 {
fast_atan(y / x)
} else if x < 0.0 {
if y >= 0.0 {
fast_atan(y / x) + std::f32::consts::PI
} else {
fast_atan(y / x) - std::f32::consts::PI
}
} else {
0.0
}
}
/// FM quadrature discriminator: instantaneous frequency via `arg(s[n] * conj(s[n-1]))`.
/// Output is in radians/sample, scaled by `1/π` to normalize to `[-1, 1]`.
pub(super) fn demod_fm_with_prev(
samples: &[Complex<f32>],
prev: &mut Option<Complex<f32>>,
) -> Vec<f32> {
if samples.is_empty() {
return Vec::new();
}
let inv_pi = std::f32::consts::FRAC_1_PI;
let mut output = Vec::with_capacity(samples.len());
if let Some(prev_sample) = prev.as_ref().copied() {
let product = samples[0] * prev_sample.conj();
output.push(fast_atan2(product.im, product.re) * inv_pi);
} else {
output.push(0.0);
}
let mut idx = 1usize;
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
if std::arch::is_x86_feature_detected!("avx2") {
idx = super::math_x86::demod_fm_body_avx2(samples, idx, inv_pi, &mut output);
}
#[cfg(any(target_arch = "arm", target_arch = "aarch64"))]
{
idx = super::math_arm::demod_fm_body_neon(samples, idx, inv_pi, &mut output);
}
for sample_idx in idx..samples.len() {
let product = samples[sample_idx] * samples[sample_idx - 1].conj();
output.push(fast_atan2(product.im, product.re) * inv_pi);
}
*prev = samples.last().copied();
output
}
@@ -0,0 +1,17 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
#[cfg(any(target_arch = "arm", target_arch = "aarch64"))]
use num_complex::Complex;
/// Placeholder hook for future ARM/NEON FM discriminator vectorization.
#[cfg(any(target_arch = "arm", target_arch = "aarch64"))]
pub(super) fn demod_fm_body_neon(
_samples: &[Complex<f32>],
start: usize,
_inv_pi: f32,
_output: &mut Vec<f32>,
) -> usize {
start
}
@@ -0,0 +1,136 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
use num_complex::Complex;
/// 7th-order minimax atan approximation for |z| <= 1.
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
#[target_feature(enable = "avx2")]
unsafe fn atan_poly_avx2(z: std::arch::x86_64::__m256) -> std::arch::x86_64::__m256 {
#[cfg(target_arch = "x86")]
use std::arch::x86::*;
#[cfg(target_arch = "x86_64")]
use std::arch::x86_64::*;
let c0 = _mm256_set1_ps(0.999_999_5_f32);
let c1 = _mm256_set1_ps(-0.333_326_1_f32);
let c2 = _mm256_set1_ps(0.199_777_1_f32);
let c3 = _mm256_set1_ps(-0.138_776_8_f32);
let z2 = _mm256_mul_ps(z, z);
let p = _mm256_add_ps(c2, _mm256_mul_ps(z2, c3));
let p = _mm256_add_ps(c1, _mm256_mul_ps(z2, p));
let p = _mm256_add_ps(c0, _mm256_mul_ps(z2, p));
_mm256_mul_ps(z, p)
}
/// Branchless AVX2 atan2 using argument reduction and polynomial evaluation.
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
#[target_feature(enable = "avx2")]
unsafe fn fast_atan2_8_avx2(
y: std::arch::x86_64::__m256,
x: std::arch::x86_64::__m256,
) -> std::arch::x86_64::__m256 {
#[cfg(target_arch = "x86")]
use std::arch::x86::*;
#[cfg(target_arch = "x86_64")]
use std::arch::x86_64::*;
let abs_mask = _mm256_castsi256_ps(_mm256_set1_epi32(0x7FFF_FFFF_u32 as i32));
let sign_mask = _mm256_castsi256_ps(_mm256_set1_epi32(0x8000_0000_u32 as i32));
let pi = _mm256_set1_ps(std::f32::consts::PI);
let pi_2 = _mm256_set1_ps(std::f32::consts::FRAC_PI_2);
let abs_y = _mm256_and_ps(y, abs_mask);
let abs_x = _mm256_and_ps(x, abs_mask);
let swap_mask = _mm256_cmp_ps(abs_y, abs_x, _CMP_GT_OS);
let num = _mm256_blendv_ps(y, x, swap_mask);
let den = _mm256_blendv_ps(x, y, swap_mask);
let eps = _mm256_set1_ps(1.0e-30);
let safe_den = _mm256_or_ps(
den,
_mm256_and_ps(_mm256_cmp_ps(den, _mm256_setzero_ps(), _CMP_EQ_OQ), eps),
);
let atan_input = _mm256_div_ps(num, safe_den);
let mut result = atan_poly_avx2(atan_input);
let adj = _mm256_sub_ps(
_mm256_or_ps(pi_2, _mm256_and_ps(atan_input, sign_mask)),
result,
);
result = _mm256_blendv_ps(result, adj, swap_mask);
let x_sign_mask = _mm256_castsi256_ps(_mm256_srai_epi32(_mm256_castps_si256(x), 31));
let correction = _mm256_and_ps(_mm256_xor_ps(pi, _mm256_and_ps(sign_mask, y)), x_sign_mask);
_mm256_add_ps(result, correction)
}
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
pub(super) fn demod_fm_body_avx2(
samples: &[Complex<f32>],
start: usize,
inv_pi: f32,
output: &mut Vec<f32>,
) -> usize {
unsafe { demod_fm_body_avx2_impl(samples, start, inv_pi, output) }
}
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
#[target_feature(enable = "avx2")]
unsafe fn demod_fm_body_avx2_impl(
samples: &[Complex<f32>],
start: usize,
inv_pi: f32,
output: &mut Vec<f32>,
) -> usize {
#[cfg(target_arch = "x86")]
use std::arch::x86::*;
#[cfg(target_arch = "x86_64")]
use std::arch::x86_64::*;
let len = samples.len();
let mut idx = start;
let mut cur_re = [0.0_f32; 8];
let mut cur_im = [0.0_f32; 8];
let mut prev_re = [0.0_f32; 8];
let mut prev_im = [0.0_f32; 8];
let mut angles = [0.0_f32; 8];
let inv_pi_v = _mm256_set1_ps(inv_pi);
while idx + 8 <= len {
for lane in 0..8 {
let cur = samples[idx + lane];
let prev = samples[idx + lane - 1];
cur_re[lane] = cur.re;
cur_im[lane] = cur.im;
prev_re[lane] = prev.re;
prev_im[lane] = prev.im;
}
let cur_re_v = _mm256_loadu_ps(cur_re.as_ptr());
let cur_im_v = _mm256_loadu_ps(cur_im.as_ptr());
let prev_re_v = _mm256_loadu_ps(prev_re.as_ptr());
let prev_im_v = _mm256_loadu_ps(prev_im.as_ptr());
let re_v = _mm256_add_ps(
_mm256_mul_ps(cur_re_v, prev_re_v),
_mm256_mul_ps(cur_im_v, prev_im_v),
);
let im_v = _mm256_sub_ps(
_mm256_mul_ps(cur_im_v, prev_re_v),
_mm256_mul_ps(cur_re_v, prev_im_v),
);
let angle_v = _mm256_mul_ps(fast_atan2_8_avx2(im_v, re_v), inv_pi_v);
_mm256_storeu_ps(angles.as_mut_ptr(), angle_v);
output.extend_from_slice(&angles);
idx += 8;
}
idx
}
@@ -0,0 +1,78 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
use num_complex::Complex;
/// USB demodulator: take the real part of each IQ sample.
pub(super) fn demod_usb(samples: &[Complex<f32>]) -> Vec<f32> {
samples.iter().map(|sample| sample.re).collect()
}
/// LSB demodulator: mixing is handled upstream by negating `channel_if_hz`.
pub(super) fn demod_lsb(samples: &[Complex<f32>]) -> Vec<f32> {
samples.iter().map(|sample| sample.re).collect()
}
/// CW demodulator: take the real part of each baseband IQ sample.
pub(super) fn demod_cw(samples: &[Complex<f32>]) -> Vec<f32> {
samples.iter().map(|sample| sample.re).collect()
}
#[cfg(test)]
mod tests {
use super::{demod_cw, demod_lsb, demod_usb};
use num_complex::Complex;
fn assert_approx_eq(a: f32, b: f32, tol: f32, label: &str) {
assert!(
(a - b).abs() <= tol,
"{}: expected {} ≈ {} (tol {})",
label,
a,
b,
tol
);
}
#[test]
fn test_usb_passthrough_takes_real_part() {
let input = vec![
Complex::new(1.0_f32, 2.0),
Complex::new(3.0, 4.0),
Complex::new(-1.0, 0.0),
Complex::new(0.0, -1.0),
];
let expected = vec![1.0_f32, 3.0, -1.0, 0.0];
assert_eq!(demod_usb(&input), expected);
assert_eq!(demod_usb(&input), expected);
}
#[test]
fn test_lsb_takes_real_part() {
let input = vec![
Complex::new(1.0_f32, 2.0),
Complex::new(3.0, 4.0),
Complex::new(-1.0, 0.0),
Complex::new(0.0, -1.0),
];
let expected = vec![1.0_f32, 3.0, -1.0, 0.0];
assert_eq!(demod_lsb(&input), expected);
}
#[test]
fn test_cw_takes_real_part() {
let input = vec![
Complex::new(3.0_f32, 4.0),
Complex::new(0.0, 0.0),
Complex::new(1.0, 0.0),
];
let out = demod_cw(&input);
assert_eq!(out.len(), 3);
assert_approx_eq(out[0], 3.0, 1e-6, "CW sample 0");
assert_approx_eq(out[1], 0.0, 1e-6, "CW sample 1");
assert_approx_eq(out[2], 1.0, 1e-6, "CW sample 2");
}
}
File diff suppressed because it is too large Load Diff
@@ -11,16 +11,20 @@
//! approach costs O(M log M) — a significant saving for the tap counts (64+)
//! and block sizes (4096) used here.
mod channel;
mod filter;
use std::f32::consts::PI;
use std::sync::{Arc, Mutex};
use num_complex::Complex;
use rustfft::num_complex::Complex as FftComplex;
use rustfft::{Fft, FftPlanner};
use rustfft::FftPlanner;
use tokio::sync::broadcast;
use trx_core::rig::state::{RdsData, RigMode};
use trx_core::rig::state::RigMode;
use crate::demod::{DcBlocker, Demodulator, SoftAgc, WfmStereoDecoder};
pub use self::channel::ChannelDsp;
pub use self::filter::{BlockFirFilter, BlockFirFilterPair, FirFilter};
// ---------------------------------------------------------------------------
// IQ source abstraction
@@ -73,858 +77,6 @@ impl IqSource for MockIqSource {
}
}
// ---------------------------------------------------------------------------
// Windowed-sinc coefficient generator (shared by FirFilter and BlockFirFilter)
// ---------------------------------------------------------------------------
fn windowed_sinc_coeffs(cutoff_norm: f32, taps: usize) -> Vec<f32> {
assert!(taps >= 1, "FIR filter must have at least 1 tap");
let m = (taps - 1) as f32;
let mut coeffs = Vec::with_capacity(taps);
for i in 0..taps {
let x = i as f32 - m / 2.0;
let sinc = if x == 0.0 {
2.0 * cutoff_norm
} else {
(2.0 * PI * cutoff_norm * x).sin() / (PI * x)
};
let window = if taps == 1 {
1.0
} else {
0.5 * (1.0 - (2.0 * PI * i as f32 / m).cos())
};
coeffs.push(sinc * window);
}
let sum: f32 = coeffs.iter().sum();
if sum.abs() > 1e-12 {
let inv = 1.0 / sum;
for c in &mut coeffs {
*c *= inv;
}
}
coeffs
}
// ---------------------------------------------------------------------------
// FIR low-pass filter (sample-by-sample, kept for unit tests)
// ---------------------------------------------------------------------------
/// A simple windowed-sinc FIR low-pass filter (sample-by-sample interface).
///
/// Used only in unit tests. The DSP pipeline uses [`BlockFirFilter`] instead.
pub struct FirFilter {
coeffs: Vec<f32>,
state: Vec<f32>,
pos: usize,
}
impl FirFilter {
pub fn new(cutoff_norm: f32, taps: usize) -> Self {
let coeffs = windowed_sinc_coeffs(cutoff_norm, taps);
let state_len = taps.saturating_sub(1);
Self {
coeffs,
state: vec![0.0; state_len],
pos: 0,
}
}
pub fn process(&mut self, sample: f32) -> f32 {
let n = self.state.len();
if n == 0 {
return sample * self.coeffs[0];
}
self.state[self.pos] = sample;
self.pos = (self.pos + 1) % n;
let mut acc = self.coeffs[0] * sample;
for k in 1..self.coeffs.len() {
let idx = (self.pos + n - k) % n;
acc += self.coeffs[k] * self.state[idx];
}
acc
}
}
// ---------------------------------------------------------------------------
// Block FIR filter — overlap-save via rustfft
// ---------------------------------------------------------------------------
/// FFT-based overlap-save FIR low-pass filter (block interface).
///
/// For a block of M samples and N taps the direct cost is O(N·M); here it
/// is O(M log M) plus a single coefficient FFT computed once at construction.
///
/// The filter is initialised for a nominal block size of [`IQ_BLOCK_SIZE`].
/// Smaller blocks are handled correctly (they incur a small padding overhead).
pub struct BlockFirFilter {
/// Frequency-domain filter coefficients (pre-computed, length `fft_size`).
h_freq: Vec<FftComplex<f32>>,
/// Overlap buffer: last `n_taps - 1` input samples (zero-initialised).
overlap: Vec<f32>,
n_taps: usize,
fft_size: usize,
fft: Arc<dyn Fft<f32>>,
ifft: Arc<dyn Fft<f32>>,
scratch_freq: Vec<FftComplex<f32>>,
}
pub struct BlockFirFilterPair {
h_freq: Vec<FftComplex<f32>>,
overlap: Vec<FftComplex<f32>>,
n_taps: usize,
fft_size: usize,
fft: Arc<dyn Fft<f32>>,
ifft: Arc<dyn Fft<f32>>,
scratch_freq: Vec<FftComplex<f32>>,
}
type FirKernel = (
Vec<FftComplex<f32>>,
usize,
Arc<dyn Fft<f32>>,
Arc<dyn Fft<f32>>,
);
fn build_fir_kernel(
cutoff_norm: f32,
taps: usize,
block_size: usize,
) -> FirKernel {
let coeffs = windowed_sinc_coeffs(cutoff_norm, taps);
let fft_size = (block_size + taps - 1).next_power_of_two();
let mut planner = FftPlanner::<f32>::new();
let fft = planner.plan_fft_forward(fft_size);
let ifft = planner.plan_fft_inverse(fft_size);
let mut h_buf: Vec<FftComplex<f32>> =
coeffs.iter().map(|&c| FftComplex::new(c, 0.0)).collect();
h_buf.resize(fft_size, FftComplex::new(0.0, 0.0));
fft.process(&mut h_buf);
(h_buf, fft_size, fft, ifft)
}
fn mul_freq_domain_scalar(buf: &mut [FftComplex<f32>], h_freq: &[FftComplex<f32>], scale: f32) {
for (x, &h) in buf.iter_mut().zip(h_freq.iter()) {
*x = FftComplex::new(
(x.re * h.re - x.im * h.im) * scale,
(x.re * h.im + x.im * h.re) * scale,
);
}
}
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
#[target_feature(enable = "avx2")]
unsafe fn mul_freq_domain_avx2(buf: &mut [FftComplex<f32>], h_freq: &[FftComplex<f32>], scale: f32) {
#[cfg(target_arch = "x86")]
use std::arch::x86::*;
#[cfg(target_arch = "x86_64")]
use std::arch::x86_64::*;
let len = buf.len().min(h_freq.len());
let mut i = 0usize;
let scale_v = _mm256_set1_ps(scale);
while i + 4 <= len {
let x_ptr = unsafe { buf.as_mut_ptr().add(i) as *mut f32 };
let h_ptr = unsafe { h_freq.as_ptr().add(i) as *const f32 };
let x_v = unsafe { _mm256_loadu_ps(x_ptr) };
let h_v = unsafe { _mm256_loadu_ps(h_ptr) };
let h_re = _mm256_moveldup_ps(h_v);
let h_im = _mm256_movehdup_ps(h_v);
let x_swapped = _mm256_permute_ps(x_v, 0xB1);
let prod = _mm256_addsub_ps(_mm256_mul_ps(x_v, h_re), _mm256_mul_ps(x_swapped, h_im));
let out = _mm256_mul_ps(prod, scale_v);
unsafe { _mm256_storeu_ps(x_ptr, out) };
i += 4;
}
mul_freq_domain_scalar(&mut buf[i..len], &h_freq[i..len], scale);
}
fn mul_freq_domain(buf: &mut [FftComplex<f32>], h_freq: &[FftComplex<f32>], scale: f32) {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
{
if std::arch::is_x86_feature_detected!("avx2") {
unsafe {
mul_freq_domain_avx2(buf, h_freq, scale);
}
return;
}
}
mul_freq_domain_scalar(buf, h_freq, scale);
}
impl BlockFirFilter {
/// Create a new `BlockFirFilter`.
///
/// `cutoff_norm`: normalised cutoff (0.00.5), i.e. `cutoff_hz / sample_rate`.
/// `taps`: number of FIR taps.
/// `block_size`: expected input block length (used to size the internal FFT).
pub fn new(cutoff_norm: f32, taps: usize, block_size: usize) -> Self {
let taps = taps.max(1);
let (h_buf, fft_size, fft, ifft) = build_fir_kernel(cutoff_norm, taps, block_size);
Self {
h_freq: h_buf,
overlap: vec![0.0; taps.saturating_sub(1)],
n_taps: taps,
fft_size,
fft,
ifft,
scratch_freq: vec![FftComplex::new(0.0, 0.0); fft_size],
}
}
/// Filter a block of real samples and return filtered samples of the same length.
///
/// Internally performs one forward FFT, a point-wise multiply with the
/// pre-computed filter response, and one inverse FFT.
pub fn filter_block_into(&mut self, input: &[f32], output: &mut Vec<f32>) {
let n_new = input.len();
let n_overlap = self.n_taps.saturating_sub(1);
// Build the time-domain frame: [overlap (N-1)] ++ [new input] ++ [zeros]
let buf = &mut self.scratch_freq;
buf.clear();
buf.reserve(self.fft_size.saturating_sub(buf.capacity()));
for &s in &self.overlap {
buf.push(FftComplex::new(s, 0.0));
}
for &s in input {
buf.push(FftComplex::new(s, 0.0));
}
buf.resize(self.fft_size, FftComplex::new(0.0, 0.0));
// Forward FFT.
self.fft.process(buf);
// Point-wise multiply with H(f); fold in the IFFT normalisation here
// to avoid a second pass.
let scale = 1.0 / self.fft_size as f32;
mul_freq_domain(buf, &self.h_freq, scale);
// Inverse FFT.
self.ifft.process(buf);
// Extract the valid output: discard the first n_overlap samples.
let end = (n_overlap + n_new).min(buf.len());
output.clear();
output.reserve(n_new.saturating_sub(output.capacity()));
output.extend(buf[n_overlap..end].iter().map(|s| s.re));
// Update overlap with the tail of the current input.
if n_overlap > 0 {
if n_new >= n_overlap {
let new_start = n_new - n_overlap;
self.overlap.copy_from_slice(&input[new_start..]);
} else {
let keep_old = n_overlap - n_new;
self.overlap.copy_within(n_new..n_overlap, 0);
self.overlap[keep_old..].copy_from_slice(input);
}
}
}
pub fn filter_block(&mut self, input: &[f32]) -> Vec<f32> {
let mut output = Vec::with_capacity(input.len());
self.filter_block_into(input, &mut output);
output
}
}
impl BlockFirFilterPair {
pub fn new(cutoff_norm: f32, taps: usize, block_size: usize) -> Self {
let taps = taps.max(1);
let (h_buf, fft_size, fft, ifft) = build_fir_kernel(cutoff_norm, taps, block_size);
Self {
h_freq: h_buf,
overlap: vec![FftComplex::new(0.0, 0.0); taps.saturating_sub(1)],
n_taps: taps,
fft_size,
fft,
ifft,
scratch_freq: vec![FftComplex::new(0.0, 0.0); fft_size],
}
}
pub fn filter_block_into(
&mut self,
input_i: &[f32],
input_q: &[f32],
output_i: &mut Vec<f32>,
output_q: &mut Vec<f32>,
) {
let n_new = input_i.len().min(input_q.len());
let n_overlap = self.n_taps.saturating_sub(1);
let buf = &mut self.scratch_freq;
buf.clear();
buf.reserve(self.fft_size.saturating_sub(buf.capacity()));
buf.extend(self.overlap.iter().copied());
for idx in 0..n_new {
buf.push(FftComplex::new(input_i[idx], input_q[idx]));
}
buf.resize(self.fft_size, FftComplex::new(0.0, 0.0));
self.fft.process(buf);
let scale = 1.0 / self.fft_size as f32;
mul_freq_domain(buf, &self.h_freq, scale);
self.ifft.process(buf);
let end = (n_overlap + n_new).min(buf.len());
output_i.clear();
output_q.clear();
output_i.reserve(n_new.saturating_sub(output_i.capacity()));
output_q.reserve(n_new.saturating_sub(output_q.capacity()));
for sample in &buf[n_overlap..end] {
output_i.push(sample.re);
output_q.push(sample.im);
}
if n_overlap > 0 {
if n_new >= n_overlap {
let new_start = n_new - n_overlap;
for (dst, idx) in self.overlap.iter_mut().zip(new_start..n_new) {
*dst = FftComplex::new(input_i[idx], input_q[idx]);
}
} else {
let keep_old = n_overlap - n_new;
self.overlap.copy_within(n_new..n_overlap, 0);
for (dst, idx) in self.overlap[keep_old..].iter_mut().zip(0..n_new) {
*dst = FftComplex::new(input_i[idx], input_q[idx]);
}
}
}
}
}
// ---------------------------------------------------------------------------
// Per-mode post-processing helpers (DC block + AGC)
// ---------------------------------------------------------------------------
/// Build the AGC for a given mode.
///
/// AM AGC must be far slower than audio modulation. With a 50 Hz bass
/// component the modulation period is 20 ms; an attack faster than that
/// causes the AGC to follow the audio envelope and distort (pumping).
/// 500 ms / 5 s only reacts to slow carrier-amplitude fading, not audio.
///
/// CW uses a fast attack/release to follow individual dots and dashes.
/// All other modes use 5 ms / 500 ms, suitable for SSB voice and FM.
fn agc_for_mode(mode: &RigMode, audio_sample_rate: u32) -> SoftAgc {
let sr = audio_sample_rate.max(1) as f32;
match mode {
RigMode::CW | RigMode::CWR => SoftAgc::new(sr, 1.0, 50.0, 0.5, 30.0),
RigMode::AM => SoftAgc::new(sr, 500.0, 5_000.0, 0.5, 30.0),
_ => SoftAgc::new(sr, 5.0, 500.0, 0.5, 30.0),
}
}
/// Build a pre-demod complex AGC for FM-family modes.
///
/// This stabilizes I/Q magnitude before the discriminator while preserving
/// phase, which helps the FM/WFM demod path behave better when strong signals
/// drive large amplitude swings inside the channel filter.
fn iq_agc_for_mode(mode: &RigMode, sample_rate: u32) -> Option<SoftAgc> {
let sr = sample_rate.max(1) as f32;
match mode {
RigMode::FM | RigMode::PKT => Some(SoftAgc::new(sr, 0.5, 150.0, 0.8, 12.0)),
// WFM uses a hard IQ limiter instead of AGC to preserve phase accuracy.
RigMode::WFM => None,
_ => None,
}
}
/// Build the DC blocker for a given mode, or `None` if not applicable.
///
/// WFM is excluded because it has its own internal DC blockers per channel.
/// All other modes use r = 0.9999 (corner ≈ 0.76 Hz @ 48 kHz), which strips
/// only true carrier DC without affecting any audible bass content.
fn dc_for_mode(mode: &RigMode) -> Option<DcBlocker> {
match mode {
RigMode::WFM => None,
_ => Some(DcBlocker::new(0.9999)),
}
}
// ---------------------------------------------------------------------------
// Channel DSP context
// ---------------------------------------------------------------------------
/// Per-channel DSP state: mixer, FFT-FIR, decimator, demodulator, frame accumulator.
pub struct ChannelDsp {
/// Frequency offset of this channel from the SDR centre (Hz).
pub channel_if_hz: f64,
/// Current demodulator (can be swapped via `set_mode`).
pub demodulator: Demodulator,
/// Current rig mode so the decimation pipeline can be rebuilt.
mode: RigMode,
/// FFT-based FIR low-pass filter applied to packed I/Q before decimation.
lpf_iq: BlockFirFilterPair,
/// SDR capture sample rate — kept for filter rebuilds.
sdr_sample_rate: u32,
/// Output audio sample rate.
audio_sample_rate: u32,
/// Requested audio bandwidth.
audio_bandwidth_hz: u32,
/// FIR tap count used when rebuilding filters.
fir_taps: usize,
/// WFM deemphasis time constant in microseconds.
wfm_deemphasis_us: u32,
/// Whether WFM stereo decoding is enabled.
wfm_stereo: bool,
/// Whether WFM stereo denoise is enabled.
wfm_denoise: bool,
/// Decimation factor: `sdr_sample_rate / audio_sample_rate`.
pub decim_factor: usize,
/// Number of PCM channels emitted in each frame.
output_channels: usize,
/// Accumulator for output PCM frames.
pub frame_buf: Vec<f32>,
/// Read cursor into `frame_buf` for completed frame emission.
frame_buf_offset: usize,
/// Target frame size in samples.
pub frame_size: usize,
/// Sender for completed PCM frames.
pub pcm_tx: broadcast::Sender<Vec<f32>>,
/// Reusable scratch buffer for mixed I samples.
scratch_mixed_i: Vec<f32>,
/// Reusable scratch buffer for mixed Q samples.
scratch_mixed_q: Vec<f32>,
/// Reusable scratch buffer for filtered I samples.
scratch_filtered_i: Vec<f32>,
/// Reusable scratch buffer for filtered Q samples.
scratch_filtered_q: Vec<f32>,
/// Reusable scratch buffer for decimated IQ samples.
scratch_decimated: Vec<Complex<f32>>,
/// Current oscillator phase (radians).
pub mixer_phase: f64,
/// Phase increment per IQ sample.
pub mixer_phase_inc: f64,
/// Decimation counter (used only for the WFM path).
decim_counter: usize,
/// Fractional-phase resampler for non-WFM paths.
/// Advances by `resample_phase_inc` per IQ sample and emits an output
/// sample whenever it crosses 1.0, giving a long-term output rate of
/// exactly `audio_sample_rate` regardless of SDR rate rounding.
resample_phase: f64,
/// `audio_sample_rate / sdr_sample_rate` (updated in `rebuild_filters`).
resample_phase_inc: f64,
/// Dedicated WFM decoder that preserves the FM composite baseband.
wfm_decoder: Option<WfmStereoDecoder>,
/// Complex-domain AGC/limiter applied before FM-family demodulation.
iq_agc: Option<SoftAgc>,
/// Soft AGC applied to all demodulated audio for consistent cross-mode levels.
audio_agc: SoftAgc,
/// DC blocker for modes whose demodulator output can carry a DC offset
/// (USB/LSB/AM/FM/DIG). None for CW and WFM.
audio_dc: Option<DcBlocker>,
}
impl ChannelDsp {
fn clamp_bandwidth_for_mode(mode: &RigMode, bandwidth_hz: u32) -> u32 {
let _ = mode;
bandwidth_hz
}
pub fn set_channel_if_hz(&mut self, channel_if_hz: f64) {
self.channel_if_hz = channel_if_hz;
self.mixer_phase_inc = if self.sdr_sample_rate == 0 {
0.0
} else {
2.0 * std::f64::consts::PI * channel_if_hz / self.sdr_sample_rate as f64
};
}
fn pipeline_rates(
mode: &RigMode,
sdr_sample_rate: u32,
audio_sample_rate: u32,
audio_bandwidth_hz: u32,
) -> (usize, u32) {
if sdr_sample_rate == 0 {
return (1, audio_sample_rate.max(1));
}
let target_rate = if *mode == RigMode::WFM {
audio_bandwidth_hz.max(audio_sample_rate.saturating_mul(4))
} else {
audio_sample_rate.max(1)
};
let decim_factor = (sdr_sample_rate / target_rate.max(1)).max(1) as usize;
let channel_sample_rate = (sdr_sample_rate / decim_factor as u32).max(1);
(decim_factor, channel_sample_rate)
}
fn rebuild_filters(&mut self, reset_wfm_decoder: bool) {
self.audio_bandwidth_hz = Self::clamp_bandwidth_for_mode(&self.mode, self.audio_bandwidth_hz);
let (next_decim_factor, channel_sample_rate) = Self::pipeline_rates(
&self.mode,
self.sdr_sample_rate,
self.audio_sample_rate,
self.audio_bandwidth_hz,
);
let iq_filter_bw = self.audio_bandwidth_hz;
let cutoff_hz = iq_filter_bw.min(channel_sample_rate.saturating_sub(1)) as f32 / 2.0;
let cutoff_norm = if self.sdr_sample_rate == 0 {
0.1
} else {
(cutoff_hz / self.sdr_sample_rate as f32).min(0.499)
};
self.lpf_iq = BlockFirFilterPair::new(cutoff_norm, self.fir_taps, IQ_BLOCK_SIZE);
let rate_changed = self.decim_factor != next_decim_factor;
self.decim_factor = next_decim_factor;
self.decim_counter = 0;
self.resample_phase = 0.0;
self.resample_phase_inc = if self.sdr_sample_rate == 0 {
1.0
} else {
self.audio_sample_rate as f64 / self.sdr_sample_rate as f64
};
if self.mode == RigMode::WFM {
if reset_wfm_decoder || rate_changed || self.wfm_decoder.is_none() {
self.wfm_decoder = Some(WfmStereoDecoder::new(
channel_sample_rate,
self.audio_sample_rate,
self.output_channels,
self.wfm_stereo,
self.wfm_deemphasis_us,
));
}
} else {
self.wfm_decoder = None;
}
self.iq_agc = iq_agc_for_mode(&self.mode, channel_sample_rate);
self.audio_agc = agc_for_mode(&self.mode, self.audio_sample_rate);
self.audio_dc = dc_for_mode(&self.mode);
self.frame_buf.clear();
self.frame_buf_offset = 0;
}
#[allow(clippy::too_many_arguments)]
pub fn new(
channel_if_hz: f64,
mode: &RigMode,
sdr_sample_rate: u32,
audio_sample_rate: u32,
output_channels: usize,
frame_duration_ms: u16,
audio_bandwidth_hz: u32,
wfm_deemphasis_us: u32,
wfm_stereo: bool,
fir_taps: usize,
pcm_tx: broadcast::Sender<Vec<f32>>,
) -> Self {
let output_channels = output_channels.max(1);
let audio_bandwidth_hz = Self::clamp_bandwidth_for_mode(mode, audio_bandwidth_hz);
let frame_size = if audio_sample_rate == 0 || frame_duration_ms == 0 {
960 * output_channels
} else {
(audio_sample_rate as usize * frame_duration_ms as usize * output_channels) / 1000
};
let taps = fir_taps.max(1);
let (decim_factor, channel_sample_rate) =
Self::pipeline_rates(mode, sdr_sample_rate, audio_sample_rate, audio_bandwidth_hz);
// For WFM, widen the IQ filter enough to pass the RDS subcarrier at
// 57 kHz (requires cutoff ≥ 65 kHz → bandwidth ≥ 130 kHz).
let iq_filter_bw = audio_bandwidth_hz;
let cutoff_hz = iq_filter_bw.min(channel_sample_rate.saturating_sub(1)) as f32 / 2.0;
let cutoff_norm = if sdr_sample_rate == 0 {
0.1
} else {
(cutoff_hz / sdr_sample_rate as f32).min(0.499)
};
let mixer_phase_inc = if sdr_sample_rate == 0 {
0.0
} else {
2.0 * std::f64::consts::PI * channel_if_hz / sdr_sample_rate as f64
};
Self {
channel_if_hz,
demodulator: Demodulator::for_mode(mode),
mode: mode.clone(),
lpf_iq: BlockFirFilterPair::new(cutoff_norm, taps, IQ_BLOCK_SIZE),
sdr_sample_rate,
audio_sample_rate,
audio_bandwidth_hz,
fir_taps: taps,
wfm_deemphasis_us,
wfm_stereo,
wfm_denoise: true,
decim_factor,
output_channels,
frame_buf: Vec::with_capacity(frame_size + output_channels),
frame_buf_offset: 0,
frame_size,
pcm_tx,
scratch_mixed_i: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_mixed_q: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_filtered_i: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_filtered_q: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_decimated: Vec::with_capacity(IQ_BLOCK_SIZE / decim_factor.max(1) + 1),
mixer_phase: 0.0,
mixer_phase_inc,
decim_counter: 0,
resample_phase: 0.0,
resample_phase_inc: if sdr_sample_rate == 0 {
1.0
} else {
audio_sample_rate as f64 / sdr_sample_rate as f64
},
wfm_decoder: if *mode == RigMode::WFM {
Some(WfmStereoDecoder::new(
channel_sample_rate,
audio_sample_rate,
output_channels,
wfm_stereo,
wfm_deemphasis_us,
))
} else {
None
},
iq_agc: iq_agc_for_mode(mode, channel_sample_rate),
audio_agc: agc_for_mode(mode, audio_sample_rate),
audio_dc: dc_for_mode(mode),
}
}
pub fn set_mode(&mut self, mode: &RigMode) {
self.mode = mode.clone();
if *mode != RigMode::WFM {
self.audio_bandwidth_hz = default_bandwidth_for_mode(mode);
}
self.demodulator = Demodulator::for_mode(mode);
self.rebuild_filters(true);
}
}
/// Returns the appropriate channel filter bandwidth for a given mode.
fn default_bandwidth_for_mode(mode: &RigMode) -> u32 {
match mode {
RigMode::LSB | RigMode::USB | RigMode::PKT | RigMode::DIG => 3_000,
RigMode::CW | RigMode::CWR => 500,
RigMode::AM => 12_000,
RigMode::FM => 12_500,
RigMode::WFM => 180_000,
RigMode::Other(_) => 3_000,
}
}
impl ChannelDsp {
/// Rebuild the FIR low-pass filters with new bandwidth and tap count.
///
/// Changes take effect on the next call to `process_block`.
pub fn set_filter(&mut self, bandwidth_hz: u32, taps: usize) {
self.audio_bandwidth_hz = Self::clamp_bandwidth_for_mode(&self.mode, bandwidth_hz);
self.fir_taps = taps.max(1);
self.rebuild_filters(false);
}
pub fn set_wfm_deemphasis(&mut self, deemphasis_us: u32) {
self.wfm_deemphasis_us = deemphasis_us;
self.rebuild_filters(true);
}
pub fn set_wfm_stereo(&mut self, enabled: bool) {
self.wfm_stereo = enabled;
if let Some(decoder) = &mut self.wfm_decoder {
decoder.set_stereo_enabled(enabled);
}
}
pub fn set_wfm_denoise(&mut self, enabled: bool) {
self.wfm_denoise = enabled;
if let Some(decoder) = &mut self.wfm_decoder {
decoder.set_denoise_enabled(enabled);
}
}
pub fn rds_data(&self) -> Option<RdsData> {
self.wfm_decoder.as_ref().and_then(WfmStereoDecoder::rds_data)
}
pub fn wfm_stereo_detected(&self) -> bool {
self.wfm_decoder
.as_ref()
.map(WfmStereoDecoder::stereo_detected)
.unwrap_or(false)
}
pub fn reset_rds(&mut self) {
if let Some(decoder) = &mut self.wfm_decoder {
decoder.reset_rds();
}
}
pub fn reset_wfm_state(&mut self) {
if let Some(decoder) = &mut self.wfm_decoder {
decoder.reset_state();
}
}
/// Process a block of raw IQ samples through the full DSP chain.
///
/// 1. **Batch mixer**: compute the full LO signal for the block at once,
/// then multiply element-wise. The loop body has no cross-iteration
/// data dependency so the compiler can auto-vectorise it.
/// 2. **FFT FIR**: apply the overlap-save FIR to I and Q in one FFT pair
/// each instead of N multiplies per sample.
/// 3. Decimate.
/// 4. Demodulate.
/// 5. Emit complete PCM frames.
pub fn process_block(&mut self, block: &[Complex<f32>]) {
let n = block.len();
if n == 0 {
return;
}
// --- 1. Batch mixer -------------------------------------------------
// Pre-compute I and Q mixer outputs for the whole block.
// Use an oscillator recurrence so we only call `sin_cos` once for the
// block instead of once per sample.
self.scratch_mixed_i.resize(n, 0.0);
self.scratch_mixed_q.resize(n, 0.0);
let mixed_i = &mut self.scratch_mixed_i;
let mixed_q = &mut self.scratch_mixed_q;
let phase_start = self.mixer_phase;
let phase_inc = self.mixer_phase_inc;
let (mut sin_phase, mut cos_phase) = phase_start.sin_cos();
let (sin_inc, cos_inc) = phase_inc.sin_cos();
for (idx, &sample) in block.iter().enumerate() {
let lo_re = cos_phase as f32;
let lo_im = -(sin_phase as f32);
// mixed = sample * exp(-j*phase) = sample * (cos - j*sin)
mixed_i[idx] = sample.re * lo_re - sample.im * lo_im;
mixed_q[idx] = sample.re * lo_im + sample.im * lo_re;
let next_sin = sin_phase * cos_inc + cos_phase * sin_inc;
let next_cos = cos_phase * cos_inc - sin_phase * sin_inc;
sin_phase = next_sin;
cos_phase = next_cos;
}
// Advance phase with wrap to avoid precision loss.
self.mixer_phase = (phase_start + n as f64 * phase_inc).rem_euclid(std::f64::consts::TAU);
// --- 2. FFT FIR (overlap-save) --------------------------------------
self.lpf_iq.filter_block_into(
mixed_i,
mixed_q,
&mut self.scratch_filtered_i,
&mut self.scratch_filtered_q,
);
let filtered_i = &self.scratch_filtered_i;
let filtered_q = &self.scratch_filtered_q;
// --- 3. Decimate / resample -----------------------------------------
let capacity = n / self.decim_factor + 1;
self.scratch_decimated.clear();
if self.scratch_decimated.capacity() < capacity {
self.scratch_decimated
.reserve(capacity - self.scratch_decimated.capacity());
}
let decimated = &mut self.scratch_decimated;
if self.wfm_decoder.is_some() {
// WFM: integer decimation preserves the FM composite signal at the
// rate expected by WfmStereoDecoder. Final rate correction is done
// inside WfmStereoDecoder via its own fractional-phase accumulator.
for i in 0..n {
self.decim_counter += 1;
if self.decim_counter >= self.decim_factor {
self.decim_counter = 0;
let fi = filtered_i.get(i).copied().unwrap_or(0.0);
let fq = filtered_q.get(i).copied().unwrap_or(0.0);
decimated.push(Complex::new(fi, fq));
}
}
} else {
// Non-WFM: fractional-phase resampler so the long-term output rate
// equals exactly `audio_sample_rate` regardless of SDR rate rounding.
for i in 0..n {
self.resample_phase += self.resample_phase_inc;
if self.resample_phase >= 1.0 {
self.resample_phase -= 1.0;
let fi = filtered_i.get(i).copied().unwrap_or(0.0);
let fq = filtered_q.get(i).copied().unwrap_or(0.0);
decimated.push(Complex::new(fi, fq));
}
}
}
if decimated.is_empty() {
return;
}
if let Some(iq_agc) = &mut self.iq_agc {
for sample in decimated.iter_mut() {
*sample = iq_agc.process_complex(*sample);
}
}
// Hard-limit IQ magnitude before the WFM discriminator so overdeviated
// signals don't produce clipped composite baseband.
if self.wfm_decoder.is_some() {
for sample in decimated.iter_mut() {
let mag = (sample.re * sample.re + sample.im * sample.im).sqrt();
if mag > 1.0 {
*sample /= mag;
}
}
}
// --- 4. Demodulate + post-process -----------------------------------
// WFM: full composite decoder (handles its own DC blocks + deemphasis).
// No AGC — WFM uses IQ hard limiting + fixed output gain to preserve
// stereo separation and avoid AGC pumping on broadcast audio.
const WFM_OUTPUT_GAIN: f32 = 0.32;
let audio = if let Some(decoder) = self.wfm_decoder.as_mut() {
let mut out = decoder.process_iq(decimated);
for sample in &mut out {
*sample = (*sample * WFM_OUTPUT_GAIN).clamp(-1.0, 1.0);
}
out
} else {
let mut raw = self.demodulator.demodulate(decimated);
for s in &mut raw {
if let Some(dc) = &mut self.audio_dc {
*s = dc.process(*s);
}
*s = self.audio_agc.process(*s);
}
if self.output_channels >= 2 {
let mut stereo = Vec::with_capacity(raw.len() * self.output_channels);
for sample in raw {
stereo.push(sample);
stereo.push(sample);
}
stereo
} else {
raw
}
};
// --- 5. Emit complete PCM frames ------------------------------------
self.frame_buf.extend_from_slice(&audio);
while self.frame_buf.len().saturating_sub(self.frame_buf_offset) >= self.frame_size {
let start = self.frame_buf_offset;
let end = start + self.frame_size;
let frame = self.frame_buf[start..end].to_vec();
self.frame_buf_offset = end;
let _ = self.pcm_tx.send(frame);
}
if self.frame_buf_offset > 0 && self.frame_buf_offset * 2 >= self.frame_buf.len() {
self.frame_buf.copy_within(self.frame_buf_offset.., 0);
self.frame_buf.truncate(self.frame_buf.len() - self.frame_buf_offset);
self.frame_buf_offset = 0;
}
}
}
// ---------------------------------------------------------------------------
// Top-level pipeline struct
// ---------------------------------------------------------------------------
@@ -1234,25 +386,6 @@ mod tests {
assert_eq!(out.len(), 128, "output length should equal input length");
}
#[test]
fn channel_dsp_processes_silence() {
let (pcm_tx, _pcm_rx) = broadcast::channel::<Vec<f32>>(8);
let mut dsp =
ChannelDsp::new(0.0, &RigMode::USB, 48_000, 8_000, 1, 20, 3000, 75, true, 31, pcm_tx);
let block = vec![Complex::new(0.0_f32, 0.0_f32); 4096];
dsp.process_block(&block);
}
#[test]
fn channel_dsp_set_mode() {
let (pcm_tx, _) = broadcast::channel::<Vec<f32>>(8);
let mut dsp =
ChannelDsp::new(0.0, &RigMode::USB, 48_000, 8_000, 1, 20, 3000, 75, true, 31, pcm_tx);
assert_eq!(dsp.demodulator, Demodulator::Usb);
dsp.set_mode(&RigMode::FM);
assert_eq!(dsp.demodulator, Demodulator::Fm);
}
#[test]
fn pipeline_starts_with_mock_source() {
let pipeline = SdrPipeline::start(
@@ -1271,7 +404,16 @@ mod tests {
#[test]
fn pipeline_empty_channels() {
let pipeline = SdrPipeline::start(Box::new(MockIqSource), 1_920_000, 48_000, 1, 20, 75, true, &[]);
let pipeline = SdrPipeline::start(
Box::new(MockIqSource),
1_920_000,
48_000,
1,
20,
75,
true,
&[],
);
assert_eq!(pipeline.pcm_senders.len(), 0);
assert_eq!(pipeline.channel_dsps.len(), 0);
}
@@ -0,0 +1,485 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
use num_complex::Complex;
use tokio::sync::broadcast;
use trx_core::rig::state::{RdsData, RigMode};
use crate::demod::{DcBlocker, Demodulator, SoftAgc, WfmStereoDecoder};
use super::{BlockFirFilterPair, IQ_BLOCK_SIZE};
fn agc_for_mode(mode: &RigMode, audio_sample_rate: u32) -> SoftAgc {
let sr = audio_sample_rate.max(1) as f32;
match mode {
RigMode::CW | RigMode::CWR => SoftAgc::new(sr, 1.0, 50.0, 0.5, 30.0),
RigMode::AM => SoftAgc::new(sr, 500.0, 5_000.0, 0.5, 30.0),
_ => SoftAgc::new(sr, 5.0, 500.0, 0.5, 30.0),
}
}
fn iq_agc_for_mode(mode: &RigMode, sample_rate: u32) -> Option<SoftAgc> {
let sr = sample_rate.max(1) as f32;
match mode {
RigMode::FM | RigMode::PKT => Some(SoftAgc::new(sr, 0.5, 150.0, 0.8, 12.0)),
RigMode::WFM => None,
_ => None,
}
}
fn dc_for_mode(mode: &RigMode) -> Option<DcBlocker> {
match mode {
RigMode::WFM => None,
_ => Some(DcBlocker::new(0.9999)),
}
}
fn default_bandwidth_for_mode(mode: &RigMode) -> u32 {
match mode {
RigMode::LSB | RigMode::USB | RigMode::PKT | RigMode::DIG => 3_000,
RigMode::CW | RigMode::CWR => 500,
RigMode::AM => 12_000,
RigMode::FM => 12_500,
RigMode::WFM => 180_000,
RigMode::Other(_) => 3_000,
}
}
/// Per-channel DSP state: mixer, FFT-FIR, decimator, demodulator, frame accumulator.
pub struct ChannelDsp {
pub channel_if_hz: f64,
pub demodulator: Demodulator,
mode: RigMode,
lpf_iq: BlockFirFilterPair,
sdr_sample_rate: u32,
audio_sample_rate: u32,
audio_bandwidth_hz: u32,
fir_taps: usize,
wfm_deemphasis_us: u32,
wfm_stereo: bool,
wfm_denoise: bool,
pub decim_factor: usize,
output_channels: usize,
pub frame_buf: Vec<f32>,
frame_buf_offset: usize,
pub frame_size: usize,
pub pcm_tx: broadcast::Sender<Vec<f32>>,
scratch_mixed_i: Vec<f32>,
scratch_mixed_q: Vec<f32>,
scratch_filtered_i: Vec<f32>,
scratch_filtered_q: Vec<f32>,
scratch_decimated: Vec<Complex<f32>>,
pub mixer_phase: f64,
pub mixer_phase_inc: f64,
decim_counter: usize,
resample_phase: f64,
resample_phase_inc: f64,
wfm_decoder: Option<WfmStereoDecoder>,
iq_agc: Option<SoftAgc>,
audio_agc: SoftAgc,
audio_dc: Option<DcBlocker>,
}
impl ChannelDsp {
fn clamp_bandwidth_for_mode(mode: &RigMode, bandwidth_hz: u32) -> u32 {
let _ = mode;
bandwidth_hz
}
pub fn set_channel_if_hz(&mut self, channel_if_hz: f64) {
self.channel_if_hz = channel_if_hz;
self.mixer_phase_inc = if self.sdr_sample_rate == 0 {
0.0
} else {
2.0 * std::f64::consts::PI * channel_if_hz / self.sdr_sample_rate as f64
};
}
fn pipeline_rates(
mode: &RigMode,
sdr_sample_rate: u32,
audio_sample_rate: u32,
audio_bandwidth_hz: u32,
) -> (usize, u32) {
if sdr_sample_rate == 0 {
return (1, audio_sample_rate.max(1));
}
let target_rate = if *mode == RigMode::WFM {
audio_bandwidth_hz.max(audio_sample_rate.saturating_mul(4))
} else {
audio_sample_rate.max(1)
};
let decim_factor = (sdr_sample_rate / target_rate.max(1)).max(1) as usize;
let channel_sample_rate = (sdr_sample_rate / decim_factor as u32).max(1);
(decim_factor, channel_sample_rate)
}
fn rebuild_filters(&mut self, reset_wfm_decoder: bool) {
self.audio_bandwidth_hz =
Self::clamp_bandwidth_for_mode(&self.mode, self.audio_bandwidth_hz);
let (next_decim_factor, channel_sample_rate) = Self::pipeline_rates(
&self.mode,
self.sdr_sample_rate,
self.audio_sample_rate,
self.audio_bandwidth_hz,
);
let cutoff_hz = self
.audio_bandwidth_hz
.min(channel_sample_rate.saturating_sub(1)) as f32
/ 2.0;
let cutoff_norm = if self.sdr_sample_rate == 0 {
0.1
} else {
(cutoff_hz / self.sdr_sample_rate as f32).min(0.499)
};
self.lpf_iq = BlockFirFilterPair::new(cutoff_norm, self.fir_taps, IQ_BLOCK_SIZE);
let rate_changed = self.decim_factor != next_decim_factor;
self.decim_factor = next_decim_factor;
self.decim_counter = 0;
self.resample_phase = 0.0;
self.resample_phase_inc = if self.sdr_sample_rate == 0 {
1.0
} else {
self.audio_sample_rate as f64 / self.sdr_sample_rate as f64
};
if self.mode == RigMode::WFM {
if reset_wfm_decoder || rate_changed || self.wfm_decoder.is_none() {
self.wfm_decoder = Some(WfmStereoDecoder::new(
channel_sample_rate,
self.audio_sample_rate,
self.output_channels,
self.wfm_stereo,
self.wfm_deemphasis_us,
));
}
} else {
self.wfm_decoder = None;
}
self.iq_agc = iq_agc_for_mode(&self.mode, channel_sample_rate);
self.audio_agc = agc_for_mode(&self.mode, self.audio_sample_rate);
self.audio_dc = dc_for_mode(&self.mode);
self.frame_buf.clear();
self.frame_buf_offset = 0;
}
#[allow(clippy::too_many_arguments)]
pub fn new(
channel_if_hz: f64,
mode: &RigMode,
sdr_sample_rate: u32,
audio_sample_rate: u32,
output_channels: usize,
frame_duration_ms: u16,
audio_bandwidth_hz: u32,
wfm_deemphasis_us: u32,
wfm_stereo: bool,
fir_taps: usize,
pcm_tx: broadcast::Sender<Vec<f32>>,
) -> Self {
let output_channels = output_channels.max(1);
let audio_bandwidth_hz = Self::clamp_bandwidth_for_mode(mode, audio_bandwidth_hz);
let frame_size = if audio_sample_rate == 0 || frame_duration_ms == 0 {
960 * output_channels
} else {
(audio_sample_rate as usize * frame_duration_ms as usize * output_channels) / 1000
};
let taps = fir_taps.max(1);
let (decim_factor, channel_sample_rate) =
Self::pipeline_rates(mode, sdr_sample_rate, audio_sample_rate, audio_bandwidth_hz);
let cutoff_hz = audio_bandwidth_hz.min(channel_sample_rate.saturating_sub(1)) as f32 / 2.0;
let cutoff_norm = if sdr_sample_rate == 0 {
0.1
} else {
(cutoff_hz / sdr_sample_rate as f32).min(0.499)
};
let mixer_phase_inc = if sdr_sample_rate == 0 {
0.0
} else {
2.0 * std::f64::consts::PI * channel_if_hz / sdr_sample_rate as f64
};
Self {
channel_if_hz,
demodulator: Demodulator::for_mode(mode),
mode: mode.clone(),
lpf_iq: BlockFirFilterPair::new(cutoff_norm, taps, IQ_BLOCK_SIZE),
sdr_sample_rate,
audio_sample_rate,
audio_bandwidth_hz,
fir_taps: taps,
wfm_deemphasis_us,
wfm_stereo,
wfm_denoise: true,
decim_factor,
output_channels,
frame_buf: Vec::with_capacity(frame_size + output_channels),
frame_buf_offset: 0,
frame_size,
pcm_tx,
scratch_mixed_i: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_mixed_q: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_filtered_i: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_filtered_q: Vec::with_capacity(IQ_BLOCK_SIZE),
scratch_decimated: Vec::with_capacity(IQ_BLOCK_SIZE / decim_factor.max(1) + 1),
mixer_phase: 0.0,
mixer_phase_inc,
decim_counter: 0,
resample_phase: 0.0,
resample_phase_inc: if sdr_sample_rate == 0 {
1.0
} else {
audio_sample_rate as f64 / sdr_sample_rate as f64
},
wfm_decoder: if *mode == RigMode::WFM {
Some(WfmStereoDecoder::new(
channel_sample_rate,
audio_sample_rate,
output_channels,
wfm_stereo,
wfm_deemphasis_us,
))
} else {
None
},
iq_agc: iq_agc_for_mode(mode, channel_sample_rate),
audio_agc: agc_for_mode(mode, audio_sample_rate),
audio_dc: dc_for_mode(mode),
}
}
pub fn set_mode(&mut self, mode: &RigMode) {
self.mode = mode.clone();
if *mode != RigMode::WFM {
self.audio_bandwidth_hz = default_bandwidth_for_mode(mode);
}
self.demodulator = Demodulator::for_mode(mode);
self.rebuild_filters(true);
}
pub fn set_filter(&mut self, bandwidth_hz: u32, taps: usize) {
self.audio_bandwidth_hz = Self::clamp_bandwidth_for_mode(&self.mode, bandwidth_hz);
self.fir_taps = taps.max(1);
self.rebuild_filters(false);
}
pub fn set_wfm_deemphasis(&mut self, deemphasis_us: u32) {
self.wfm_deemphasis_us = deemphasis_us;
self.rebuild_filters(true);
}
pub fn set_wfm_stereo(&mut self, enabled: bool) {
self.wfm_stereo = enabled;
if let Some(decoder) = &mut self.wfm_decoder {
decoder.set_stereo_enabled(enabled);
}
}
pub fn set_wfm_denoise(&mut self, enabled: bool) {
self.wfm_denoise = enabled;
if let Some(decoder) = &mut self.wfm_decoder {
decoder.set_denoise_enabled(enabled);
}
}
pub fn rds_data(&self) -> Option<RdsData> {
self.wfm_decoder
.as_ref()
.and_then(WfmStereoDecoder::rds_data)
}
pub fn wfm_stereo_detected(&self) -> bool {
self.wfm_decoder
.as_ref()
.map(WfmStereoDecoder::stereo_detected)
.unwrap_or(false)
}
pub fn reset_rds(&mut self) {
if let Some(decoder) = &mut self.wfm_decoder {
decoder.reset_rds();
}
}
pub fn reset_wfm_state(&mut self) {
if let Some(decoder) = &mut self.wfm_decoder {
decoder.reset_state();
}
}
pub fn process_block(&mut self, block: &[Complex<f32>]) {
let n = block.len();
if n == 0 {
return;
}
self.scratch_mixed_i.resize(n, 0.0);
self.scratch_mixed_q.resize(n, 0.0);
let mixed_i = &mut self.scratch_mixed_i;
let mixed_q = &mut self.scratch_mixed_q;
let phase_start = self.mixer_phase;
let phase_inc = self.mixer_phase_inc;
let (mut sin_phase, mut cos_phase) = phase_start.sin_cos();
let (sin_inc, cos_inc) = phase_inc.sin_cos();
for (idx, &sample) in block.iter().enumerate() {
let lo_re = cos_phase as f32;
let lo_im = -(sin_phase as f32);
mixed_i[idx] = sample.re * lo_re - sample.im * lo_im;
mixed_q[idx] = sample.re * lo_im + sample.im * lo_re;
let next_sin = sin_phase * cos_inc + cos_phase * sin_inc;
let next_cos = cos_phase * cos_inc - sin_phase * sin_inc;
sin_phase = next_sin;
cos_phase = next_cos;
}
self.mixer_phase = (phase_start + n as f64 * phase_inc).rem_euclid(std::f64::consts::TAU);
self.lpf_iq.filter_block_into(
mixed_i,
mixed_q,
&mut self.scratch_filtered_i,
&mut self.scratch_filtered_q,
);
let filtered_i = &self.scratch_filtered_i;
let filtered_q = &self.scratch_filtered_q;
let capacity = n / self.decim_factor + 1;
self.scratch_decimated.clear();
if self.scratch_decimated.capacity() < capacity {
self.scratch_decimated
.reserve(capacity - self.scratch_decimated.capacity());
}
let decimated = &mut self.scratch_decimated;
if self.wfm_decoder.is_some() {
for idx in 0..n {
self.decim_counter += 1;
if self.decim_counter >= self.decim_factor {
self.decim_counter = 0;
let fi = filtered_i.get(idx).copied().unwrap_or(0.0);
let fq = filtered_q.get(idx).copied().unwrap_or(0.0);
decimated.push(Complex::new(fi, fq));
}
}
} else {
for idx in 0..n {
self.resample_phase += self.resample_phase_inc;
if self.resample_phase >= 1.0 {
self.resample_phase -= 1.0;
let fi = filtered_i.get(idx).copied().unwrap_or(0.0);
let fq = filtered_q.get(idx).copied().unwrap_or(0.0);
decimated.push(Complex::new(fi, fq));
}
}
}
if decimated.is_empty() {
return;
}
if let Some(iq_agc) = &mut self.iq_agc {
for sample in decimated.iter_mut() {
*sample = iq_agc.process_complex(*sample);
}
}
if self.wfm_decoder.is_some() {
for sample in decimated.iter_mut() {
let mag = (sample.re * sample.re + sample.im * sample.im).sqrt();
if mag > 1.0 {
*sample /= mag;
}
}
}
const WFM_OUTPUT_GAIN: f32 = 0.32;
let audio = if let Some(decoder) = self.wfm_decoder.as_mut() {
let mut out = decoder.process_iq(decimated);
for sample in &mut out {
*sample = (*sample * WFM_OUTPUT_GAIN).clamp(-1.0, 1.0);
}
out
} else {
let mut raw = self.demodulator.demodulate(decimated);
for sample in &mut raw {
if let Some(dc) = &mut self.audio_dc {
*sample = dc.process(*sample);
}
*sample = self.audio_agc.process(*sample);
}
if self.output_channels >= 2 {
let mut stereo = Vec::with_capacity(raw.len() * self.output_channels);
for sample in raw {
stereo.push(sample);
stereo.push(sample);
}
stereo
} else {
raw
}
};
self.frame_buf.extend_from_slice(&audio);
while self.frame_buf.len().saturating_sub(self.frame_buf_offset) >= self.frame_size {
let start = self.frame_buf_offset;
let end = start + self.frame_size;
let frame = self.frame_buf[start..end].to_vec();
self.frame_buf_offset = end;
let _ = self.pcm_tx.send(frame);
}
if self.frame_buf_offset > 0 && self.frame_buf_offset * 2 >= self.frame_buf.len() {
self.frame_buf.copy_within(self.frame_buf_offset.., 0);
self.frame_buf
.truncate(self.frame_buf.len() - self.frame_buf_offset);
self.frame_buf_offset = 0;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn channel_dsp_processes_silence() {
let (pcm_tx, _pcm_rx) = broadcast::channel::<Vec<f32>>(8);
let mut dsp = ChannelDsp::new(
0.0,
&RigMode::USB,
48_000,
8_000,
1,
20,
3000,
75,
true,
31,
pcm_tx,
);
let block = vec![Complex::new(0.0_f32, 0.0_f32); 4096];
dsp.process_block(&block);
}
#[test]
fn channel_dsp_set_mode() {
let (pcm_tx, _) = broadcast::channel::<Vec<f32>>(8);
let mut dsp = ChannelDsp::new(
0.0,
&RigMode::USB,
48_000,
8_000,
1,
20,
3000,
75,
true,
31,
pcm_tx,
);
assert_eq!(dsp.demodulator, Demodulator::Usb);
dsp.set_mode(&RigMode::FM);
assert_eq!(dsp.demodulator, Demodulator::Fm);
}
}
@@ -0,0 +1,300 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
use std::f32::consts::PI;
use std::sync::Arc;
use rustfft::num_complex::Complex as FftComplex;
use rustfft::{Fft, FftPlanner};
fn windowed_sinc_coeffs(cutoff_norm: f32, taps: usize) -> Vec<f32> {
assert!(taps >= 1, "FIR filter must have at least 1 tap");
let m = (taps - 1) as f32;
let mut coeffs = Vec::with_capacity(taps);
for i in 0..taps {
let x = i as f32 - m / 2.0;
let sinc = if x == 0.0 {
2.0 * cutoff_norm
} else {
(2.0 * PI * cutoff_norm * x).sin() / (PI * x)
};
let window = if taps == 1 {
1.0
} else {
0.5 * (1.0 - (2.0 * PI * i as f32 / m).cos())
};
coeffs.push(sinc * window);
}
let sum: f32 = coeffs.iter().sum();
if sum.abs() > 1e-12 {
let inv = 1.0 / sum;
for coeff in &mut coeffs {
*coeff *= inv;
}
}
coeffs
}
/// A simple windowed-sinc FIR low-pass filter (sample-by-sample interface).
///
/// Used only in unit tests. The DSP pipeline uses [`BlockFirFilter`] instead.
pub struct FirFilter {
coeffs: Vec<f32>,
state: Vec<f32>,
pos: usize,
}
impl FirFilter {
pub fn new(cutoff_norm: f32, taps: usize) -> Self {
let coeffs = windowed_sinc_coeffs(cutoff_norm, taps);
let state_len = taps.saturating_sub(1);
Self {
coeffs,
state: vec![0.0; state_len],
pos: 0,
}
}
pub fn process(&mut self, sample: f32) -> f32 {
let n = self.state.len();
if n == 0 {
return sample * self.coeffs[0];
}
self.state[self.pos] = sample;
self.pos = (self.pos + 1) % n;
let mut acc = self.coeffs[0] * sample;
for k in 1..self.coeffs.len() {
let idx = (self.pos + n - k) % n;
acc += self.coeffs[k] * self.state[idx];
}
acc
}
}
/// FFT-based overlap-save FIR low-pass filter (block interface).
pub struct BlockFirFilter {
h_freq: Vec<FftComplex<f32>>,
overlap: Vec<f32>,
n_taps: usize,
fft_size: usize,
fft: Arc<dyn Fft<f32>>,
ifft: Arc<dyn Fft<f32>>,
scratch_freq: Vec<FftComplex<f32>>,
}
pub struct BlockFirFilterPair {
h_freq: Vec<FftComplex<f32>>,
overlap: Vec<FftComplex<f32>>,
n_taps: usize,
fft_size: usize,
fft: Arc<dyn Fft<f32>>,
ifft: Arc<dyn Fft<f32>>,
scratch_freq: Vec<FftComplex<f32>>,
}
type FirKernel = (
Vec<FftComplex<f32>>,
usize,
Arc<dyn Fft<f32>>,
Arc<dyn Fft<f32>>,
);
fn build_fir_kernel(cutoff_norm: f32, taps: usize, block_size: usize) -> FirKernel {
let coeffs = windowed_sinc_coeffs(cutoff_norm, taps);
let fft_size = (block_size + taps - 1).next_power_of_two();
let mut planner = FftPlanner::<f32>::new();
let fft = planner.plan_fft_forward(fft_size);
let ifft = planner.plan_fft_inverse(fft_size);
let mut h_buf: Vec<FftComplex<f32>> = coeffs
.iter()
.map(|&coeff| FftComplex::new(coeff, 0.0))
.collect();
fft.process({
h_buf.resize(fft_size, FftComplex::new(0.0, 0.0));
&mut h_buf
});
(h_buf, fft_size, fft, ifft)
}
fn mul_freq_domain_scalar(buf: &mut [FftComplex<f32>], h_freq: &[FftComplex<f32>], scale: f32) {
for (x, &h) in buf.iter_mut().zip(h_freq.iter()) {
*x = FftComplex::new(
(x.re * h.re - x.im * h.im) * scale,
(x.re * h.im + x.im * h.re) * scale,
);
}
}
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
#[target_feature(enable = "avx2")]
unsafe fn mul_freq_domain_avx2(
buf: &mut [FftComplex<f32>],
h_freq: &[FftComplex<f32>],
scale: f32,
) {
#[cfg(target_arch = "x86")]
use std::arch::x86::*;
#[cfg(target_arch = "x86_64")]
use std::arch::x86_64::*;
let len = buf.len().min(h_freq.len());
let mut i = 0usize;
let scale_v = _mm256_set1_ps(scale);
while i + 4 <= len {
let x_ptr = buf.as_mut_ptr().add(i) as *mut f32;
let h_ptr = h_freq.as_ptr().add(i) as *const f32;
let x_v = _mm256_loadu_ps(x_ptr);
let h_v = _mm256_loadu_ps(h_ptr);
let h_re = _mm256_moveldup_ps(h_v);
let h_im = _mm256_movehdup_ps(h_v);
let x_swapped = _mm256_permute_ps(x_v, 0xB1);
let prod = _mm256_addsub_ps(_mm256_mul_ps(x_v, h_re), _mm256_mul_ps(x_swapped, h_im));
let out = _mm256_mul_ps(prod, scale_v);
_mm256_storeu_ps(x_ptr, out);
i += 4;
}
mul_freq_domain_scalar(&mut buf[i..len], &h_freq[i..len], scale);
}
fn mul_freq_domain(buf: &mut [FftComplex<f32>], h_freq: &[FftComplex<f32>], scale: f32) {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
{
if std::arch::is_x86_feature_detected!("avx2") {
unsafe {
mul_freq_domain_avx2(buf, h_freq, scale);
}
return;
}
}
mul_freq_domain_scalar(buf, h_freq, scale);
}
impl BlockFirFilter {
pub fn new(cutoff_norm: f32, taps: usize, block_size: usize) -> Self {
let taps = taps.max(1);
let (h_buf, fft_size, fft, ifft) = build_fir_kernel(cutoff_norm, taps, block_size);
Self {
h_freq: h_buf,
overlap: vec![0.0; taps.saturating_sub(1)],
n_taps: taps,
fft_size,
fft,
ifft,
scratch_freq: vec![FftComplex::new(0.0, 0.0); fft_size],
}
}
pub fn filter_block_into(&mut self, input: &[f32], output: &mut Vec<f32>) {
let n_new = input.len();
let n_overlap = self.n_taps.saturating_sub(1);
let buf = &mut self.scratch_freq;
buf.clear();
buf.reserve(self.fft_size.saturating_sub(buf.capacity()));
for &sample in &self.overlap {
buf.push(FftComplex::new(sample, 0.0));
}
for &sample in input {
buf.push(FftComplex::new(sample, 0.0));
}
buf.resize(self.fft_size, FftComplex::new(0.0, 0.0));
self.fft.process(buf);
mul_freq_domain(buf, &self.h_freq, 1.0 / self.fft_size as f32);
self.ifft.process(buf);
let end = (n_overlap + n_new).min(buf.len());
output.clear();
output.reserve(n_new.saturating_sub(output.capacity()));
output.extend(buf[n_overlap..end].iter().map(|sample| sample.re));
if n_overlap > 0 {
if n_new >= n_overlap {
let new_start = n_new - n_overlap;
self.overlap.copy_from_slice(&input[new_start..]);
} else {
let keep_old = n_overlap - n_new;
self.overlap.copy_within(n_new..n_overlap, 0);
self.overlap[keep_old..].copy_from_slice(input);
}
}
}
pub fn filter_block(&mut self, input: &[f32]) -> Vec<f32> {
let mut output = Vec::with_capacity(input.len());
self.filter_block_into(input, &mut output);
output
}
}
impl BlockFirFilterPair {
pub fn new(cutoff_norm: f32, taps: usize, block_size: usize) -> Self {
let taps = taps.max(1);
let (h_buf, fft_size, fft, ifft) = build_fir_kernel(cutoff_norm, taps, block_size);
Self {
h_freq: h_buf,
overlap: vec![FftComplex::new(0.0, 0.0); taps.saturating_sub(1)],
n_taps: taps,
fft_size,
fft,
ifft,
scratch_freq: vec![FftComplex::new(0.0, 0.0); fft_size],
}
}
pub fn filter_block_into(
&mut self,
input_i: &[f32],
input_q: &[f32],
output_i: &mut Vec<f32>,
output_q: &mut Vec<f32>,
) {
let n_new = input_i.len().min(input_q.len());
let n_overlap = self.n_taps.saturating_sub(1);
let buf = &mut self.scratch_freq;
buf.clear();
buf.reserve(self.fft_size.saturating_sub(buf.capacity()));
buf.extend(self.overlap.iter().copied());
for idx in 0..n_new {
buf.push(FftComplex::new(input_i[idx], input_q[idx]));
}
buf.resize(self.fft_size, FftComplex::new(0.0, 0.0));
self.fft.process(buf);
mul_freq_domain(buf, &self.h_freq, 1.0 / self.fft_size as f32);
self.ifft.process(buf);
let end = (n_overlap + n_new).min(buf.len());
output_i.clear();
output_q.clear();
output_i.reserve(n_new.saturating_sub(output_i.capacity()));
output_q.reserve(n_new.saturating_sub(output_q.capacity()));
for sample in &buf[n_overlap..end] {
output_i.push(sample.re);
output_q.push(sample.im);
}
if n_overlap > 0 {
if n_new >= n_overlap {
let new_start = n_new - n_overlap;
for (dst, idx) in self.overlap.iter_mut().zip(new_start..n_new) {
*dst = FftComplex::new(input_i[idx], input_q[idx]);
}
} else {
let keep_old = n_overlap - n_new;
self.overlap.copy_within(n_new..n_overlap, 0);
for (dst, idx) in self.overlap[keep_old..].iter_mut().zip(0..n_new) {
*dst = FftComplex::new(input_i[idx], input_q[idx]);
}
}
}
}
}