fix(trx-backend-soapysdr): implement real IQ streaming and fix PKT demodulation

Three root causes prevented APRS decoding at 144.800 MHz with PKT/FM mode:

1. `RealIqSource::read_into` returned zeros — the SoapySDR streaming API
   was never wired up.  `RxStream<Complex<f32>>` is `Send` and
   `StreamSample` is implemented for `num_complex::Complex<f32>` in the
   soapysdr 0.3 crate, so the stream can read directly into the IQ buffer.
   Now creates and activates an `RxStream` in `new()` and calls
   `stream.read` in `read_into`.

2. PKT mode used `Passthrough` (take `.re`) demodulation.  VHF/UHF packet
   radio (APRS, AX.25) is FM-encoded AFSK — it must be FM-demodulated
   before the APRS decoder sees the audio tones.  Changed PKT to `Fm`.

3. `iq_read_loop` always slept `block_duration_ms` after each read.  Real
   hardware already blocks inside `read_into`; the extra sleep doubled
   latency.  Added `IqSource::is_blocking()` (default `false`; `true` for
   `RealIqSource`) and skip the throttle sleep for blocking sources.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-02-27 01:25:34 +01:00
parent 1a3cc0def7
commit df79f06ff0
3 changed files with 61 additions and 41 deletions
@@ -34,7 +34,10 @@ impl Demodulator {
RigMode::FM => Self::Fm,
RigMode::WFM => Self::Wfm,
RigMode::CW | RigMode::CWR => Self::Cw,
RigMode::DIG | RigMode::PKT => Self::Passthrough,
RigMode::DIG => Self::Passthrough,
// VHF/UHF packet radio (APRS, AX.25) is FM-encoded AFSK.
// FM-demodulate the signal before passing audio to the APRS decoder.
RigMode::PKT => Self::Fm,
RigMode::Other(_) => Self::Usb,
}
}
@@ -300,10 +303,7 @@ mod tests {
Demodulator::for_mode(&RigMode::DIG),
Demodulator::Passthrough
);
assert_eq!(
Demodulator::for_mode(&RigMode::PKT),
Demodulator::Passthrough
);
assert_eq!(Demodulator::for_mode(&RigMode::PKT), Demodulator::Fm);
}
// Test 9: All demodulators return an empty Vec for empty input without panicking.
@@ -31,6 +31,13 @@ pub trait IqSource: Send + 'static {
/// Read the next block of IQ samples into `buf`.
/// Returns the number of samples written, or an error string.
fn read_into(&mut self, buf: &mut [Complex<f32>]) -> Result<usize, String>;
/// Returns `true` when `read_into` blocks until samples are ready
/// (i.e. hardware-backed sources). The read loop uses this to skip the
/// extra throttle sleep that is only needed for non-blocking mock sources.
fn is_blocking(&self) -> bool {
false
}
}
// ---------------------------------------------------------------------------
@@ -464,6 +471,10 @@ fn iq_read_loop(
} else {
1
};
// Blocking sources (real hardware) already pace the loop inside read_into.
// Non-blocking sources (MockIqSource) need an explicit sleep to avoid
// busy-spinning at 100 % CPU.
let throttle = !source.is_blocking();
loop {
let n = match source.read_into(&mut block) {
@@ -493,12 +504,9 @@ fn iq_read_loop(
}
}
// Throttle only when source is faster than real time (e.g. MockIqSource).
// Real hardware naturally blocks in read_into; sleeping here would
// double-throttle it. We detect "faster than real time" by checking
// whether the source returned immediately (always true for mock,
// never for blocking hardware reads).
std::thread::sleep(std::time::Duration::from_millis(block_duration_ms));
if throttle {
std::thread::sleep(std::time::Duration::from_millis(block_duration_ms));
}
}
}
@@ -11,13 +11,18 @@ use crate::dsp::IqSource;
/// Real SoapySDR device IQ source.
///
/// Reads IQ samples directly from a SoapySDR-compatible device.
/// Reads IQ samples directly from a SoapySDR-compatible device via the
/// SoapySDR streaming API. `RxStream<Complex<f32>>` is `Send` (the crate
/// provides `unsafe impl Send`) and `StreamSample` is implemented for
/// `num_complex::Complex<f32>`, so no type conversion is needed.
pub struct RealIqSource {
/// Device is held here to keep it alive for the lifetime of this source.
/// Direct reads are not yet implemented; see read_into() TODO.
/// Device is held here to keep it alive for the stream's lifetime.
#[allow(dead_code)]
device: Device,
buffer: Vec<Complex<f32>>,
/// Active RX stream producing CF32 samples.
stream: soapysdr::RxStream<Complex<f32>>,
/// Indicates the stream is hardware-backed (blocks in read_into).
pub is_blocking: bool,
}
impl RealIqSource {
@@ -31,7 +36,7 @@ impl RealIqSource {
/// - `gain_db`: RX gain in dB
///
/// # Returns
/// A configured RealIqSource or an error string if initialization fails.
/// A configured `RealIqSource` or an error string if initialisation fails.
pub fn new(
args: &str,
center_freq_hz: f64,
@@ -41,17 +46,14 @@ impl RealIqSource {
) -> Result<Self, String> {
tracing::info!("Initializing SoapySDR device with args: {}", args);
// Create device from arguments string.
let device = match Device::new(args) {
Ok(dev) => dev,
Err(e) => {
// First attempt failed - try fallback strategies
tracing::warn!(
"Failed to open device with args '{}': {}. Attempting fallback...",
args, e
args,
e
);
// Try with empty args as fallback (grab first available device)
match Device::new("") {
Ok(dev) => {
tracing::warn!(
@@ -125,40 +127,50 @@ impl RealIqSource {
if let Err(e) = device.set_gain(soapysdr::Direction::Rx, 0, gain_db) {
tracing::warn!("Failed to set gain: {}; using device default", e);
} else {
let actual_gain = device
.gain(soapysdr::Direction::Rx, 0)
.unwrap_or(gain_db);
let actual_gain = device.gain(soapysdr::Direction::Rx, 0).unwrap_or(gain_db);
tracing::info!("Set gain to {} dB (actual: {} dB)", gain_db, actual_gain);
}
let buffer = vec![Complex::new(0.0_f32, 0.0_f32); 4096];
// Create RX stream. CF32 = Complex<f32>, StreamSample is implemented
// for num_complex::Complex<f32> so no conversion is needed.
let mut stream = device
.rx_stream::<Complex<f32>>(&[0])
.map_err(|e| format!("Failed to create RX stream: {}", e))?;
tracing::info!("RealIqSource initialized successfully");
// Activate the stream (start hardware capture).
stream
.activate(None)
.map_err(|e| format!("Failed to activate RX stream: {}", e))?;
tracing::info!("RealIqSource: RX stream activated, streaming started");
Ok(Self {
device,
buffer,
stream,
is_blocking: true,
})
}
/// Retune the SDR hardware center frequency without recreating the stream.
pub fn set_center_freq(&self, freq_hz: f64) -> Result<(), String> {
self.device
.set_frequency(soapysdr::Direction::Rx, 0, freq_hz, ())
.map_err(|e| format!("Failed to retune center frequency: {}", e))
}
}
impl IqSource for RealIqSource {
fn read_into(&mut self, buf: &mut [Complex<f32>]) -> Result<usize, String> {
let max_samples = buf.len().min(4096);
// 1 second timeout; gives the recovery loop a chance to react without
// busy-spinning when the device stalls.
const TIMEOUT_US: i64 = 1_000_000;
// TODO: Implement actual streaming read from device
// Currently the soapysdr 0.3 crate may not expose direct IQ streaming APIs.
// This would require either:
// 1. Using unsafe FFI to access the underlying SoapySDR C API
// 2. Upgrading to a newer soapysdr crate version with streaming support
// 3. Implementing a custom streaming wrapper around soapysdr-sys
//
// For now, return zero-filled buffer to allow architecture to work
// while we wait for proper streaming implementation.
self.stream
.read(&[buf], TIMEOUT_US)
.map_err(|e| format!("Stream read error: {}", e))
}
self.buffer.truncate(max_samples);
self.buffer.resize(max_samples, Complex::new(0.0, 0.0));
buf[..max_samples].copy_from_slice(&self.buffer[..max_samples]);
Ok(max_samples)
fn is_blocking(&self) -> bool {
self.is_blocking
}
}