diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs index 927884c..4a40b63 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs @@ -44,6 +44,13 @@ pub trait IqSource: Send + 'static { fn set_center_freq(&mut self, _hz: f64) -> Result<(), String> { Ok(()) } + + /// Gives a source-specific implementation a chance to recover from a + /// read error (for example, by rearming a hardware stream after overflow). + /// Returns `true` when an active recovery action was attempted. + fn handle_read_error(&mut self, _err: &str) -> Result { + Ok(false) + } } // --------------------------------------------------------------------------- @@ -703,6 +710,7 @@ fn iq_read_loop( let mut planner = FftPlanner::::new(); let fft = planner.plan_fft_forward(SPECTRUM_FFT_SIZE); let mut spectrum_counter: usize = 0; + let mut read_error_streak: u32 = 0; loop { // Apply any pending hardware retune before the next read. @@ -717,10 +725,37 @@ fn iq_read_loop( } let n = match source.read_into(&mut block) { - Ok(n) => n, + Ok(n) => { + read_error_streak = 0; + n + } Err(e) => { - tracing::warn!("IQ source read error: {}; retrying", e); - std::thread::sleep(std::time::Duration::from_millis(10)); + read_error_streak = read_error_streak.saturating_add(1); + let recovered = match source.handle_read_error(&e) { + Ok(result) => result, + Err(recovery_err) => { + tracing::warn!( + "IQ source recovery after read error failed: {}", + recovery_err + ); + false + } + }; + tracing::warn!( + "IQ source read error: {}; retrying (streak={}, recovered={})", + e, + read_error_streak, + recovered + ); + let base_sleep_ms = if recovered { + block_duration_ms.max(20) + } else { + 10 + }; + let sleep_ms = (base_sleep_ms as u128) + .saturating_mul(1u128 << read_error_streak.saturating_sub(1).min(4)) + .min(250) as u64; + std::thread::sleep(std::time::Duration::from_millis(sleep_ms)); continue; } }; diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs index fb69fb5..6fabd20 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs @@ -172,4 +172,22 @@ impl IqSource for RealIqSource { .set_frequency(soapysdr::Direction::Rx, 0, hz, ()) .map_err(|e| format!("Failed to retune SDR center frequency: {}", e)) } + + fn handle_read_error(&mut self, err: &str) -> Result { + let err_lc = err.to_ascii_lowercase(); + let is_overrun = err_lc.contains("overflow") || err_lc.contains("overrun"); + if !is_overrun { + return Ok(false); + } + + tracing::warn!("SoapySDR RX overflow detected; restarting RX stream"); + self.stream + .deactivate(None) + .map_err(|e| format!("Failed to deactivate RX stream after overflow: {}", e))?; + std::thread::sleep(std::time::Duration::from_millis(25)); + self.stream + .activate(None) + .map_err(|e| format!("Failed to reactivate RX stream after overflow: {}", e))?; + Ok(true) + } }