From 5a6ff1427a9be72c5df0c6701da02ba409f2bf9a Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sat, 28 Feb 2026 23:06:53 +0100 Subject: [PATCH] [fix](trx-client,trx-backend-soapysdr): improve rig switching and fm agc Co-authored-by: OpenAI Codex Signed-off-by: Stan Grams --- src/trx-client/src/remote_client.rs | 33 ++++++++++--- .../trx-backend-soapysdr/src/demod.rs | 18 ++++++++ .../trx-backend-soapysdr/src/dsp.rs | 46 ++++++++++++++++++- 3 files changed, 89 insertions(+), 8 deletions(-) diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index 69f7e3b..5cb1051 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -22,6 +22,7 @@ use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse}; const DEFAULT_REMOTE_PORT: u16 = 4530; const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); const IO_TIMEOUT: Duration = Duration::from_secs(15); +const SPECTRUM_IO_TIMEOUT: Duration = Duration::from_millis(300); const MAX_JSON_LINE_BYTES: usize = 16 * 1024; #[derive(Clone, Debug, PartialEq, Eq)] @@ -147,6 +148,12 @@ async fn handle_connection( continue; } last_spectrum_poll = Instant::now(); + if !should_poll_spectrum(config) { + if let Ok(mut guard) = config.spectrum.lock() { + guard.replace(None); + } + continue; + } match send_command_no_state_update(config, &mut writer, &mut reader, ClientCommand::GetSpectrum).await { @@ -239,19 +246,19 @@ async fn send_command_no_state_update( let payload = serde_json::to_string(&envelope) .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; time::timeout( - IO_TIMEOUT, + SPECTRUM_IO_TIMEOUT, writer.write_all(format!("{}\n", payload).as_bytes()), ) .await - .map_err(|_| RigError::communication(format!("write timed out after {:?}", IO_TIMEOUT)))? + .map_err(|_| RigError::communication(format!("write timed out after {:?}", SPECTRUM_IO_TIMEOUT)))? .map_err(|e| RigError::communication(format!("write failed: {e}")))?; - time::timeout(IO_TIMEOUT, writer.flush()) + time::timeout(SPECTRUM_IO_TIMEOUT, writer.flush()) .await - .map_err(|_| RigError::communication(format!("flush timed out after {:?}", IO_TIMEOUT)))? + .map_err(|_| RigError::communication(format!("flush timed out after {:?}", SPECTRUM_IO_TIMEOUT)))? .map_err(|e| RigError::communication(format!("flush failed: {e}")))?; - let line = time::timeout(IO_TIMEOUT, read_limited_line(reader, MAX_JSON_LINE_BYTES)) + let line = time::timeout(SPECTRUM_IO_TIMEOUT, read_limited_line(reader, MAX_JSON_LINE_BYTES)) .await - .map_err(|_| RigError::communication(format!("read timed out after {:?}", IO_TIMEOUT)))? + .map_err(|_| RigError::communication(format!("read timed out after {:?}", SPECTRUM_IO_TIMEOUT)))? .map_err(|e| RigError::communication(format!("read failed: {e}")))?; let line = line.ok_or_else(|| RigError::communication("connection closed by remote"))?; let resp: ClientResponse = serde_json::from_str(line.trim_end()) @@ -370,6 +377,20 @@ fn set_selected_rig_id(config: &RemoteClientConfig, value: Option) { } } +fn should_poll_spectrum(config: &RemoteClientConfig) -> bool { + let selected = selected_rig_id(config); + let Some(selected) = selected.as_deref() else { + return true; + }; + config + .known_rigs + .lock() + .ok() + .and_then(|entries| entries.iter().find(|entry| entry.rig_id == selected).cloned()) + .map(|entry| entry.state.initialized) + .unwrap_or(true) +} + fn choose_default_rig(rigs: &[RigEntry]) -> Option<&RigEntry> { rigs.iter().max_by_key(|entry| { let tx_capable = entry.state.info.capabilities.tx; diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs index f43f858..7f37c4b 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs @@ -157,6 +157,24 @@ impl SoftAgc { let gain = self.update_gain(x.abs()); (x * gain).clamp(-1.0, 1.0) } + + pub(crate) fn process_pair(&mut self, left: f32, right: f32) -> (f32, f32) { + let gain = self.update_gain(left.abs().max(right.abs())); + ( + (left * gain).clamp(-1.0, 1.0), + (right * gain).clamp(-1.0, 1.0), + ) + } + + pub(crate) fn process_complex(&mut self, x: Complex) -> Complex { + let gain = self.update_gain((x.re * x.re + x.im * x.im).sqrt()); + let mut y = x * gain; + let mag = (y.re * y.re + y.im * y.im).sqrt(); + if mag > 1.0 { + y /= mag; + } + y + } } impl BiquadBandPass { diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs index 7d0e599..a8102ed 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs @@ -271,6 +271,19 @@ fn agc_for_mode(mode: &RigMode, audio_sample_rate: u32) -> SoftAgc { } } +/// Build a pre-demod complex AGC for FM-family modes. +/// +/// This stabilizes I/Q magnitude before the discriminator while preserving +/// phase, which helps the FM/WFM demod path behave better when strong signals +/// drive large amplitude swings inside the channel filter. +fn iq_agc_for_mode(mode: &RigMode, sample_rate: u32) -> Option { + let sr = sample_rate.max(1) as f32; + match mode { + RigMode::FM | RigMode::WFM | RigMode::PKT => Some(SoftAgc::new(sr, 0.5, 150.0, 0.8, 12.0)), + _ => None, + } +} + /// Build the DC blocker for a given mode, or `None` if not applicable. /// /// WFM is excluded because it has its own internal DC blockers per channel. @@ -336,6 +349,8 @@ pub struct ChannelDsp { resample_phase_inc: f64, /// Dedicated WFM decoder that preserves the FM composite baseband. wfm_decoder: Option, + /// Complex-domain AGC/limiter applied before FM-family demodulation. + iq_agc: Option, /// Soft AGC applied to all demodulated audio for consistent cross-mode levels. audio_agc: SoftAgc, /// DC blocker for modes whose demodulator output can carry a DC offset @@ -417,6 +432,7 @@ impl ChannelDsp { } else { self.wfm_decoder = None; } + self.iq_agc = iq_agc_for_mode(&self.mode, channel_sample_rate); self.audio_agc = agc_for_mode(&self.mode, self.audio_sample_rate); self.audio_dc = dc_for_mode(&self.mode); self.frame_buf.clear(); @@ -503,6 +519,7 @@ impl ChannelDsp { } else { None }, + iq_agc: iq_agc_for_mode(mode, channel_sample_rate), audio_agc: agc_for_mode(mode, audio_sample_rate), audio_dc: dc_for_mode(mode), } @@ -649,12 +666,37 @@ impl ChannelDsp { return; } + if let Some(iq_agc) = &mut self.iq_agc { + for sample in &mut decimated { + *sample = iq_agc.process_complex(*sample); + } + } + // --- 4. Demodulate + post-process ----------------------------------- // WFM: full composite decoder (handles its own DC blocks + deemphasis). // All other modes: stateless demodulator → DC blocker (where enabled) → AGC. - // AGC is applied to WFM output too so all modes share the same target level. + // WFM uses linked audio AGC after stereo decode; all other modes use + // the normal post-demod AGC path. let audio = if let Some(decoder) = self.wfm_decoder.as_mut() { - decoder.process_iq(&decimated) + let mut out = decoder.process_iq(&decimated); + if !self.wfm_stereo && self.output_channels >= 2 { + for pair in out.chunks_exact_mut(2) { + let mono = self.audio_agc.process(pair[0]); + pair[0] = mono; + pair[1] = mono; + } + } else if self.wfm_stereo && self.output_channels >= 2 { + for pair in out.chunks_exact_mut(2) { + let (left, right) = self.audio_agc.process_pair(pair[0], pair[1]); + pair[0] = left; + pair[1] = right; + } + } else { + for s in &mut out { + *s = self.audio_agc.process(*s); + } + } + out } else { let mut raw = self.demodulator.demodulate(&decimated); for s in &mut raw {