[fix](trx-backend-soapysdr): back off after stream overruns

Co-authored-by: Codex <codex@openai.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-02-28 11:24:09 +01:00
parent ee9add1b53
commit e886f97eb9
2 changed files with 56 additions and 3 deletions
@@ -44,6 +44,13 @@ pub trait IqSource: Send + 'static {
fn set_center_freq(&mut self, _hz: f64) -> Result<(), String> {
Ok(())
}
/// Gives a source-specific implementation a chance to recover from a
/// read error (for example, by rearming a hardware stream after overflow).
/// Returns `true` when an active recovery action was attempted.
fn handle_read_error(&mut self, _err: &str) -> Result<bool, String> {
Ok(false)
}
}
// ---------------------------------------------------------------------------
@@ -703,6 +710,7 @@ fn iq_read_loop(
let mut planner = FftPlanner::<f32>::new();
let fft = planner.plan_fft_forward(SPECTRUM_FFT_SIZE);
let mut spectrum_counter: usize = 0;
let mut read_error_streak: u32 = 0;
loop {
// Apply any pending hardware retune before the next read.
@@ -717,10 +725,37 @@ fn iq_read_loop(
}
let n = match source.read_into(&mut block) {
Ok(n) => n,
Ok(n) => {
read_error_streak = 0;
n
}
Err(e) => {
tracing::warn!("IQ source read error: {}; retrying", e);
std::thread::sleep(std::time::Duration::from_millis(10));
read_error_streak = read_error_streak.saturating_add(1);
let recovered = match source.handle_read_error(&e) {
Ok(result) => result,
Err(recovery_err) => {
tracing::warn!(
"IQ source recovery after read error failed: {}",
recovery_err
);
false
}
};
tracing::warn!(
"IQ source read error: {}; retrying (streak={}, recovered={})",
e,
read_error_streak,
recovered
);
let base_sleep_ms = if recovered {
block_duration_ms.max(20)
} else {
10
};
let sleep_ms = (base_sleep_ms as u128)
.saturating_mul(1u128 << read_error_streak.saturating_sub(1).min(4))
.min(250) as u64;
std::thread::sleep(std::time::Duration::from_millis(sleep_ms));
continue;
}
};
@@ -172,4 +172,22 @@ impl IqSource for RealIqSource {
.set_frequency(soapysdr::Direction::Rx, 0, hz, ())
.map_err(|e| format!("Failed to retune SDR center frequency: {}", e))
}
fn handle_read_error(&mut self, err: &str) -> Result<bool, String> {
let err_lc = err.to_ascii_lowercase();
let is_overrun = err_lc.contains("overflow") || err_lc.contains("overrun");
if !is_overrun {
return Ok(false);
}
tracing::warn!("SoapySDR RX overflow detected; restarting RX stream");
self.stream
.deactivate(None)
.map_err(|e| format!("Failed to deactivate RX stream after overflow: {}", e))?;
std::thread::sleep(std::time::Duration::from_millis(25));
self.stream
.activate(None)
.map_err(|e| format!("Failed to reactivate RX stream after overflow: {}", e))?;
Ok(true)
}
}