diff --git a/src/trx-server/src/rig_task.rs b/src/trx-server/src/rig_task.rs index 5e4677f..0c959a8 100644 --- a/src/trx-server/src/rig_task.rs +++ b/src/trx-server/src/rig_task.rs @@ -323,6 +323,54 @@ pub async fn run_rig_task( // Process each request while let Some(RigRequest { cmd, respond_to, .. }) = batch.pop() { + if matches!(cmd, RigCommand::GetSpectrum) { + let mut responders = vec![respond_to]; + let mut idx = 0; + while idx < batch.len() { + if matches!(batch[idx].cmd, RigCommand::GetSpectrum) { + let req = batch.swap_remove(idx); + responders.push(req.respond_to); + } else { + idx += 1; + } + } + + let mut cmd_ctx = CommandExecContext { + rig: &mut rig, + state: &mut state, + machine: &mut machine, + emitter: &emitter, + poll_pause_until: &mut poll_pause_until, + last_power_on: &mut last_power_on, + state_tx: &state_tx, + retry, + histories: &histories, + }; + let result = match time::timeout( + COMMAND_EXEC_TIMEOUT, + process_command(RigCommand::GetSpectrum, &mut cmd_ctx), + ) + .await + { + Ok(result) => result, + Err(_) => { + error!( + "Rig command GetSpectrum timed out after {:?}", + COMMAND_EXEC_TIMEOUT + ); + Err(RigError::communication(format!( + "command timed out after {:?}", + COMMAND_EXEC_TIMEOUT + ))) + } + }; + + for responder in responders { + let _ = responder.send(result.clone()); + } + continue; + } + let cmd_label = format!("{:?}", cmd); let log_command = !matches!(&cmd, RigCommand::GetSpectrum); let started = Instant::now(); diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs index 3177e3f..1faf8ed 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/real_iq_source.rs @@ -180,20 +180,24 @@ impl IqSource for RealIqSource { } fn handle_read_error(&mut self, err: &str, streak: u32) -> Result { + const OVERFLOW_RESTART_STREAK: u32 = 50; + const NON_OVERFLOW_RESTART_STREAK: u32 = 10; + let err_lc = err.to_ascii_lowercase(); let is_overrun = err_lc.contains("overflow") || err_lc.contains("overrun"); if is_overrun { - // Overflow is often transient; avoid immediate stream restart churn. - // Only restart after several consecutive overflow failures. - if streak < 3 { - return Ok(true); + // Some SoapySDR drivers can wedge inside deactivate/activate after + // repeated overflow. Keep backing off reads, but avoid automatic + // stream restart on overflow so the server remains responsive. + if streak == OVERFLOW_RESTART_STREAK { + tracing::error!( + "SoapySDR RX overflow persists (streak={}); skipping automatic stream restart to avoid driver wedge", + streak + ); } - tracing::warn!( - "SoapySDR RX overflow persists (streak={}); restarting RX stream", - streak - ); - } else if streak >= 10 { + return Ok(true); + } else if streak >= NON_OVERFLOW_RESTART_STREAK { // Non-overflow errors at a high streak (e.g. reads on a // deactivated stream after a failed activate) — attempt a // full restart to recover.