From 541e27bb7a92c7007a010e43e59ff5a69637c42d Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Mon, 9 Mar 2026 22:59:50 +0100 Subject: [PATCH] [arch](trx-client): watch channel for spectrum + dedicated TCP connection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace Arc> with Arc> throughout the stack: - SharedSpectrum: remove revision counter, derive Clone, make fields pub, rename replace() → set(). The watch channel handles dedup natively. - FrontendRuntimeContext.spectrum: Mutex → watch::Sender; SSE clients call .subscribe() to get a push-based receiver at zero polling cost. - RemoteClientConfig: derive Clone, switch spectrum field to match. Spectrum polling moves to a dedicated TCP connection (run_spectrum_connection + handle_spectrum_connection spawned as a separate tokio task). This eliminates head-of-line blocking: spectrum timeouts no longer stall state polls or user commands on the main connection. Each side reconnects independently; the spectrum task marks the frame None while reconnecting. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Stan Grams --- src/trx-client/src/remote_client.rs | 134 ++++++++++++++++++------- src/trx-client/trx-frontend/src/lib.rs | 38 +++---- 2 files changed, 115 insertions(+), 57 deletions(-) diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index 9b9558d..69caf78 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -44,14 +44,15 @@ impl RemoteEndpoint { const SPECTRUM_POLL_INTERVAL: Duration = Duration::from_millis(40); +#[derive(Clone)] pub struct RemoteClientConfig { pub addr: String, pub token: Option, pub selected_rig_id: Arc>>, pub known_rigs: Arc>>, pub poll_interval: Duration, - /// Shared buffer updated by spectrum polling; None when backend has no spectrum. - pub spectrum: Arc>, + /// Spectrum watch sender; spectrum task publishes here, SSE clients subscribe. + pub spectrum: Arc>, } pub async fn run_remote_client( @@ -60,11 +61,19 @@ pub async fn run_remote_client( state_tx: watch::Sender, mut shutdown_rx: watch::Receiver, ) -> RigResult<()> { + // Spectrum polling runs on its own dedicated TCP connection so it never + // blocks state polls or user commands on the main connection. + let spectrum_task = tokio::spawn(run_spectrum_connection( + config.clone(), + shutdown_rx.clone(), + )); + let mut reconnect_delay = Duration::from_secs(1); loop { if *shutdown_rx.borrow() { info!("Remote client shutting down"); + spectrum_task.abort(); return Ok(()); } @@ -99,10 +108,14 @@ pub async fn run_remote_client( match changed { Ok(()) if *shutdown_rx.borrow() => { info!("Remote client shutting down"); + spectrum_task.abort(); return Ok(()); } Ok(()) => {} - Err(_) => return Ok(()), + Err(_) => { + spectrum_task.abort(); + return Ok(()); + } } } } @@ -110,6 +123,86 @@ pub async fn run_remote_client( } } +/// Spectrum polling runs on a dedicated TCP connection so it never blocks +/// state polls or user commands on the main connection. Reconnects +/// independently with a short fixed delay. +async fn run_spectrum_connection( + config: RemoteClientConfig, + mut shutdown_rx: watch::Receiver, +) { + loop { + if *shutdown_rx.borrow() { + break; + } + + match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await { + Ok(Ok(stream)) => { + if let Err(e) = stream.set_nodelay(true) { + warn!("Spectrum TCP_NODELAY failed: {}", e); + } + if let Err(e) = + handle_spectrum_connection(&config, stream, &mut shutdown_rx).await + { + warn!("Spectrum connection dropped: {}", e); + } + // Mark spectrum unavailable while reconnecting. + config.spectrum.send_modify(|s| s.set(None)); + } + Ok(Err(e)) => warn!("Spectrum connect failed: {}", e), + Err(_) => warn!("Spectrum connect timed out"), + } + + tokio::select! { + _ = time::sleep(Duration::from_secs(1)) => {} + changed = shutdown_rx.changed() => { + if matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() { + break; + } + } + } + } +} + +async fn handle_spectrum_connection( + config: &RemoteClientConfig, + stream: TcpStream, + shutdown_rx: &mut watch::Receiver, +) -> RigResult<()> { + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + let mut interval = time::interval(SPECTRUM_POLL_INTERVAL); + + loop { + tokio::select! { + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => return Ok(()), + Ok(()) => {} + Err(_) => return Ok(()), + } + } + _ = interval.tick() => { + if !should_poll_spectrum(config) { + config.spectrum.send_modify(|s| s.set(None)); + continue; + } + match send_command_no_state_update( + config, &mut writer, &mut reader, + ClientCommand::GetSpectrum, + ).await { + Ok(snapshot) => config.spectrum.send_modify(|s| s.set(snapshot.spectrum)), + Err(e) => { + // A spectrum timeout desynchronises the TCP framing; + // return so the caller reconnects and restores sync. + config.spectrum.send_modify(|s| s.set(None)); + return Err(e); + } + } + } + } + } +} + async fn handle_connection( config: &RemoteClientConfig, stream: TcpStream, @@ -122,10 +215,6 @@ async fn handle_connection( let mut poll_interval = time::interval(config.poll_interval); let mut last_poll = Instant::now(); let mut poll_failure_streak: u32 = 0; - let mut spectrum_interval = time::interval(SPECTRUM_POLL_INTERVAL); - let mut last_spectrum_poll = Instant::now() - .checked_sub(SPECTRUM_POLL_INTERVAL) - .unwrap_or_else(Instant::now); // Prime rig list/state immediately after connect so frontends can render // rig selectors without waiting for the first poll interval. @@ -172,37 +261,6 @@ async fn handle_connection( poll_failure_streak = 0; } } - _ = spectrum_interval.tick() => { - if last_spectrum_poll.elapsed() < SPECTRUM_POLL_INTERVAL { - continue; - } - last_spectrum_poll = Instant::now(); - if !should_poll_spectrum(config) { - if let Ok(mut guard) = config.spectrum.lock() { - guard.replace(None); - } - continue; - } - match send_command_no_state_update(config, &mut writer, &mut reader, - ClientCommand::GetSpectrum).await - { - Ok(snapshot) => { - if let Ok(mut guard) = config.spectrum.lock() { - guard.replace(snapshot.spectrum); - } - } - Err(e) => { - // A spectrum poll failure desynchronises the TCP stream - // (the in-flight response is still in the buffer). - // Propagate the error so the caller reconnects and - // restores protocol sync. - if let Ok(mut guard) = config.spectrum.lock() { - guard.replace(None); - } - return Err(e); - } - } - } req = rx.recv() => { let Some(req) = req else { return Ok(()); diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index 7917c43..8f57a61 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -38,32 +38,29 @@ pub trait FrontendSpawner { ) -> JoinHandle<()>; } -#[derive(Debug, Default)] +/// Spectrum snapshot shared between the spectrum polling task and SSE clients. +/// +/// Stored in a `watch::channel`; each SSE client subscribes and is woken +/// exactly when new data arrives (no 40 ms polling loop needed on the reader +/// side). `Arc` makes clone O(1) regardless of bin count. +#[derive(Debug, Default, Clone)] pub struct SharedSpectrum { - revision: u64, - // Arc so that each SSE client gets a cheap pointer clone instead of - // copying the entire bin vector (~8 KB for 2048 f32 bins). - frame: Option>, - // RDS JSON serialised once at ingestion; avoids per-client serde work - // on every 40 ms tick for a field that changes at most once per second. - rds_json: Option, + /// Latest spectrum frame; `None` when the active backend has no spectrum. + pub frame: Option>, + /// RDS JSON pre-serialised at ingestion so SSE clients don't repeat the + /// work on every tick. + pub rds_json: Option, } impl SharedSpectrum { - pub fn replace(&mut self, frame: Option) { - self.revision = self.revision.wrapping_add(1); + /// Replace the stored frame, pre-serialising RDS in one pass. + pub fn set(&mut self, frame: Option) { self.rds_json = frame .as_ref() .and_then(|f| f.rds.as_ref()) .and_then(|r| serde_json::to_string(r).ok()); self.frame = frame.map(Arc::new); } - - /// Returns `(revision, frame, rds_json)`. - /// `rds_json` is pre-serialised; `None` means no RDS data. - pub fn snapshot(&self) -> (u64, Option>, Option) { - (self.revision, self.frame.clone(), self.rds_json.clone()) - } } pub type FrontendSpawnFn = fn( @@ -205,8 +202,8 @@ pub struct FrontendRuntimeContext { pub owner_website_name: Option, /// Optional base URL used to link AIS vessel names as ``. pub ais_vessel_url_base: Option, - /// Latest spectrum frame from the active SDR rig; None for non-SDR backends. - pub spectrum: Arc>, + /// Spectrum sender; SSE clients subscribe via `spectrum.subscribe()`. + pub spectrum: Arc>, } impl FrontendRuntimeContext { @@ -245,7 +242,10 @@ impl FrontendRuntimeContext { owner_website_url: None, owner_website_name: None, ais_vessel_url_base: None, - spectrum: Arc::new(Mutex::new(SharedSpectrum::default())), + spectrum: { + let (tx, _rx) = watch::channel(SharedSpectrum::default()); + Arc::new(tx) + }, } } }