diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs index b685e02..04e1f1f 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs @@ -34,7 +34,10 @@ impl Demodulator { RigMode::FM => Self::Fm, RigMode::WFM => Self::Wfm, RigMode::CW | RigMode::CWR => Self::Cw, - RigMode::DIG | RigMode::PKT => Self::Passthrough, + RigMode::DIG => Self::Passthrough, + // VHF/UHF packet radio (APRS, AX.25) is FM-encoded AFSK. + // FM-demodulate the signal before passing audio to the APRS decoder. + RigMode::PKT => Self::Fm, RigMode::Other(_) => Self::Usb, } } @@ -300,10 +303,7 @@ mod tests { Demodulator::for_mode(&RigMode::DIG), Demodulator::Passthrough ); - assert_eq!( - Demodulator::for_mode(&RigMode::PKT), - Demodulator::Passthrough - ); + assert_eq!(Demodulator::for_mode(&RigMode::PKT), Demodulator::Fm); } // Test 9: All demodulators return an empty Vec for empty input without panicking. diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs index 1de5325..d57b21c 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs @@ -31,6 +31,13 @@ pub trait IqSource: Send + 'static { /// Read the next block of IQ samples into `buf`. /// Returns the number of samples written, or an error string. fn read_into(&mut self, buf: &mut [Complex]) -> Result; + + /// Returns `true` when `read_into` blocks until samples are ready + /// (i.e. hardware-backed sources). The read loop uses this to skip the + /// extra throttle sleep that is only needed for non-blocking mock sources. + fn is_blocking(&self) -> bool { + false + } } // --------------------------------------------------------------------------- @@ -464,6 +471,10 @@ fn iq_read_loop( } else { 1 }; + // Blocking sources (real hardware) already pace the loop inside read_into. + // Non-blocking sources (MockIqSource) need an explicit sleep to avoid + // busy-spinning at 100 % CPU. + let throttle = !source.is_blocking(); loop { let n = match source.read_into(&mut block) { @@ -493,12 +504,9 @@ fn iq_read_loop( } } - // Throttle only when source is faster than real time (e.g. MockIqSource). - // Real hardware naturally blocks in read_into; sleeping here would - // double-throttle it. We detect "faster than real time" by checking - // whether the source returned immediately (always true for mock, - // never for blocking hardware reads). - std::thread::sleep(std::time::Duration::from_millis(block_duration_ms)); + if throttle { + std::thread::sleep(std::time::Duration::from_millis(block_duration_ms)); + } } } diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs index 04f3c9c..db16613 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs @@ -11,13 +11,18 @@ use crate::dsp::IqSource; /// Real SoapySDR device IQ source. /// -/// Reads IQ samples directly from a SoapySDR-compatible device. +/// Reads IQ samples directly from a SoapySDR-compatible device via the +/// SoapySDR streaming API. `RxStream>` is `Send` (the crate +/// provides `unsafe impl Send`) and `StreamSample` is implemented for +/// `num_complex::Complex`, so no type conversion is needed. pub struct RealIqSource { - /// Device is held here to keep it alive for the lifetime of this source. - /// Direct reads are not yet implemented; see read_into() TODO. + /// Device is held here to keep it alive for the stream's lifetime. #[allow(dead_code)] device: Device, - buffer: Vec>, + /// Active RX stream producing CF32 samples. + stream: soapysdr::RxStream>, + /// Indicates the stream is hardware-backed (blocks in read_into). + pub is_blocking: bool, } impl RealIqSource { @@ -31,7 +36,7 @@ impl RealIqSource { /// - `gain_db`: RX gain in dB /// /// # Returns - /// A configured RealIqSource or an error string if initialization fails. + /// A configured `RealIqSource` or an error string if initialisation fails. pub fn new( args: &str, center_freq_hz: f64, @@ -41,17 +46,14 @@ impl RealIqSource { ) -> Result { tracing::info!("Initializing SoapySDR device with args: {}", args); - // Create device from arguments string. let device = match Device::new(args) { Ok(dev) => dev, Err(e) => { - // First attempt failed - try fallback strategies tracing::warn!( "Failed to open device with args '{}': {}. Attempting fallback...", - args, e + args, + e ); - - // Try with empty args as fallback (grab first available device) match Device::new("") { Ok(dev) => { tracing::warn!( @@ -125,40 +127,50 @@ impl RealIqSource { if let Err(e) = device.set_gain(soapysdr::Direction::Rx, 0, gain_db) { tracing::warn!("Failed to set gain: {}; using device default", e); } else { - let actual_gain = device - .gain(soapysdr::Direction::Rx, 0) - .unwrap_or(gain_db); + let actual_gain = device.gain(soapysdr::Direction::Rx, 0).unwrap_or(gain_db); tracing::info!("Set gain to {} dB (actual: {} dB)", gain_db, actual_gain); } - let buffer = vec![Complex::new(0.0_f32, 0.0_f32); 4096]; + // Create RX stream. CF32 = Complex, StreamSample is implemented + // for num_complex::Complex so no conversion is needed. + let mut stream = device + .rx_stream::>(&[0]) + .map_err(|e| format!("Failed to create RX stream: {}", e))?; - tracing::info!("RealIqSource initialized successfully"); + // Activate the stream (start hardware capture). + stream + .activate(None) + .map_err(|e| format!("Failed to activate RX stream: {}", e))?; + + tracing::info!("RealIqSource: RX stream activated, streaming started"); Ok(Self { device, - buffer, + stream, + is_blocking: true, }) } + + /// Retune the SDR hardware center frequency without recreating the stream. + pub fn set_center_freq(&self, freq_hz: f64) -> Result<(), String> { + self.device + .set_frequency(soapysdr::Direction::Rx, 0, freq_hz, ()) + .map_err(|e| format!("Failed to retune center frequency: {}", e)) + } } impl IqSource for RealIqSource { fn read_into(&mut self, buf: &mut [Complex]) -> Result { - let max_samples = buf.len().min(4096); + // 1 second timeout; gives the recovery loop a chance to react without + // busy-spinning when the device stalls. + const TIMEOUT_US: i64 = 1_000_000; - // TODO: Implement actual streaming read from device - // Currently the soapysdr 0.3 crate may not expose direct IQ streaming APIs. - // This would require either: - // 1. Using unsafe FFI to access the underlying SoapySDR C API - // 2. Upgrading to a newer soapysdr crate version with streaming support - // 3. Implementing a custom streaming wrapper around soapysdr-sys - // - // For now, return zero-filled buffer to allow architecture to work - // while we wait for proper streaming implementation. + self.stream + .read(&[buf], TIMEOUT_US) + .map_err(|e| format!("Stream read error: {}", e)) + } - self.buffer.truncate(max_samples); - self.buffer.resize(max_samples, Complex::new(0.0, 0.0)); - buf[..max_samples].copy_from_slice(&self.buffer[..max_samples]); - Ok(max_samples) + fn is_blocking(&self) -> bool { + self.is_blocking } }