[arch](trx-client): watch channel for spectrum + dedicated TCP connection

Replace Arc<Mutex<SharedSpectrum>> with Arc<watch::Sender<SharedSpectrum>>
throughout the stack:

- SharedSpectrum: remove revision counter, derive Clone, make fields pub,
  rename replace() → set(). The watch channel handles dedup natively.
- FrontendRuntimeContext.spectrum: Mutex → watch::Sender; SSE clients
  call .subscribe() to get a push-based receiver at zero polling cost.
- RemoteClientConfig: derive Clone, switch spectrum field to match.

Spectrum polling moves to a dedicated TCP connection (run_spectrum_connection
+ handle_spectrum_connection spawned as a separate tokio task). This
eliminates head-of-line blocking: spectrum timeouts no longer stall state
polls or user commands on the main connection. Each side reconnects
independently; the spectrum task marks the frame None while reconnecting.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-09 22:59:50 +01:00
parent aa079598bd
commit 541e27bb7a
2 changed files with 115 additions and 57 deletions
+96 -38
View File
@@ -44,14 +44,15 @@ impl RemoteEndpoint {
const SPECTRUM_POLL_INTERVAL: Duration = Duration::from_millis(40); const SPECTRUM_POLL_INTERVAL: Duration = Duration::from_millis(40);
#[derive(Clone)]
pub struct RemoteClientConfig { pub struct RemoteClientConfig {
pub addr: String, pub addr: String,
pub token: Option<String>, pub token: Option<String>,
pub selected_rig_id: Arc<Mutex<Option<String>>>, pub selected_rig_id: Arc<Mutex<Option<String>>>,
pub known_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>, pub known_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>,
pub poll_interval: Duration, pub poll_interval: Duration,
/// Shared buffer updated by spectrum polling; None when backend has no spectrum. /// Spectrum watch sender; spectrum task publishes here, SSE clients subscribe.
pub spectrum: Arc<Mutex<SharedSpectrum>>, pub spectrum: Arc<watch::Sender<SharedSpectrum>>,
} }
pub async fn run_remote_client( pub async fn run_remote_client(
@@ -60,11 +61,19 @@ pub async fn run_remote_client(
state_tx: watch::Sender<RigState>, state_tx: watch::Sender<RigState>,
mut shutdown_rx: watch::Receiver<bool>, mut shutdown_rx: watch::Receiver<bool>,
) -> RigResult<()> { ) -> RigResult<()> {
// Spectrum polling runs on its own dedicated TCP connection so it never
// blocks state polls or user commands on the main connection.
let spectrum_task = tokio::spawn(run_spectrum_connection(
config.clone(),
shutdown_rx.clone(),
));
let mut reconnect_delay = Duration::from_secs(1); let mut reconnect_delay = Duration::from_secs(1);
loop { loop {
if *shutdown_rx.borrow() { if *shutdown_rx.borrow() {
info!("Remote client shutting down"); info!("Remote client shutting down");
spectrum_task.abort();
return Ok(()); return Ok(());
} }
@@ -99,10 +108,14 @@ pub async fn run_remote_client(
match changed { match changed {
Ok(()) if *shutdown_rx.borrow() => { Ok(()) if *shutdown_rx.borrow() => {
info!("Remote client shutting down"); info!("Remote client shutting down");
spectrum_task.abort();
return Ok(()); return Ok(());
} }
Ok(()) => {} Ok(()) => {}
Err(_) => return Ok(()), Err(_) => {
spectrum_task.abort();
return Ok(());
}
} }
} }
} }
@@ -110,6 +123,86 @@ pub async fn run_remote_client(
} }
} }
/// Spectrum polling runs on a dedicated TCP connection so it never blocks
/// state polls or user commands on the main connection. Reconnects
/// independently with a short fixed delay.
async fn run_spectrum_connection(
config: RemoteClientConfig,
mut shutdown_rx: watch::Receiver<bool>,
) {
loop {
if *shutdown_rx.borrow() {
break;
}
match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await {
Ok(Ok(stream)) => {
if let Err(e) = stream.set_nodelay(true) {
warn!("Spectrum TCP_NODELAY failed: {}", e);
}
if let Err(e) =
handle_spectrum_connection(&config, stream, &mut shutdown_rx).await
{
warn!("Spectrum connection dropped: {}", e);
}
// Mark spectrum unavailable while reconnecting.
config.spectrum.send_modify(|s| s.set(None));
}
Ok(Err(e)) => warn!("Spectrum connect failed: {}", e),
Err(_) => warn!("Spectrum connect timed out"),
}
tokio::select! {
_ = time::sleep(Duration::from_secs(1)) => {}
changed = shutdown_rx.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() {
break;
}
}
}
}
}
async fn handle_spectrum_connection(
config: &RemoteClientConfig,
stream: TcpStream,
shutdown_rx: &mut watch::Receiver<bool>,
) -> RigResult<()> {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut interval = time::interval(SPECTRUM_POLL_INTERVAL);
loop {
tokio::select! {
changed = shutdown_rx.changed() => {
match changed {
Ok(()) if *shutdown_rx.borrow() => return Ok(()),
Ok(()) => {}
Err(_) => return Ok(()),
}
}
_ = interval.tick() => {
if !should_poll_spectrum(config) {
config.spectrum.send_modify(|s| s.set(None));
continue;
}
match send_command_no_state_update(
config, &mut writer, &mut reader,
ClientCommand::GetSpectrum,
).await {
Ok(snapshot) => config.spectrum.send_modify(|s| s.set(snapshot.spectrum)),
Err(e) => {
// A spectrum timeout desynchronises the TCP framing;
// return so the caller reconnects and restores sync.
config.spectrum.send_modify(|s| s.set(None));
return Err(e);
}
}
}
}
}
}
async fn handle_connection( async fn handle_connection(
config: &RemoteClientConfig, config: &RemoteClientConfig,
stream: TcpStream, stream: TcpStream,
@@ -122,10 +215,6 @@ async fn handle_connection(
let mut poll_interval = time::interval(config.poll_interval); let mut poll_interval = time::interval(config.poll_interval);
let mut last_poll = Instant::now(); let mut last_poll = Instant::now();
let mut poll_failure_streak: u32 = 0; let mut poll_failure_streak: u32 = 0;
let mut spectrum_interval = time::interval(SPECTRUM_POLL_INTERVAL);
let mut last_spectrum_poll = Instant::now()
.checked_sub(SPECTRUM_POLL_INTERVAL)
.unwrap_or_else(Instant::now);
// Prime rig list/state immediately after connect so frontends can render // Prime rig list/state immediately after connect so frontends can render
// rig selectors without waiting for the first poll interval. // rig selectors without waiting for the first poll interval.
@@ -172,37 +261,6 @@ async fn handle_connection(
poll_failure_streak = 0; poll_failure_streak = 0;
} }
} }
_ = spectrum_interval.tick() => {
if last_spectrum_poll.elapsed() < SPECTRUM_POLL_INTERVAL {
continue;
}
last_spectrum_poll = Instant::now();
if !should_poll_spectrum(config) {
if let Ok(mut guard) = config.spectrum.lock() {
guard.replace(None);
}
continue;
}
match send_command_no_state_update(config, &mut writer, &mut reader,
ClientCommand::GetSpectrum).await
{
Ok(snapshot) => {
if let Ok(mut guard) = config.spectrum.lock() {
guard.replace(snapshot.spectrum);
}
}
Err(e) => {
// A spectrum poll failure desynchronises the TCP stream
// (the in-flight response is still in the buffer).
// Propagate the error so the caller reconnects and
// restores protocol sync.
if let Ok(mut guard) = config.spectrum.lock() {
guard.replace(None);
}
return Err(e);
}
}
}
req = rx.recv() => { req = rx.recv() => {
let Some(req) = req else { let Some(req) = req else {
return Ok(()); return Ok(());
+19 -19
View File
@@ -38,32 +38,29 @@ pub trait FrontendSpawner {
) -> JoinHandle<()>; ) -> JoinHandle<()>;
} }
#[derive(Debug, Default)] /// Spectrum snapshot shared between the spectrum polling task and SSE clients.
///
/// Stored in a `watch::channel`; each SSE client subscribes and is woken
/// exactly when new data arrives (no 40 ms polling loop needed on the reader
/// side). `Arc<SpectrumData>` makes clone O(1) regardless of bin count.
#[derive(Debug, Default, Clone)]
pub struct SharedSpectrum { pub struct SharedSpectrum {
revision: u64, /// Latest spectrum frame; `None` when the active backend has no spectrum.
// Arc so that each SSE client gets a cheap pointer clone instead of pub frame: Option<Arc<SpectrumData>>,
// copying the entire bin vector (~8 KB for 2048 f32 bins). /// RDS JSON pre-serialised at ingestion so SSE clients don't repeat the
frame: Option<Arc<SpectrumData>>, /// work on every tick.
// RDS JSON serialised once at ingestion; avoids per-client serde work pub rds_json: Option<String>,
// on every 40 ms tick for a field that changes at most once per second.
rds_json: Option<String>,
} }
impl SharedSpectrum { impl SharedSpectrum {
pub fn replace(&mut self, frame: Option<SpectrumData>) { /// Replace the stored frame, pre-serialising RDS in one pass.
self.revision = self.revision.wrapping_add(1); pub fn set(&mut self, frame: Option<SpectrumData>) {
self.rds_json = frame self.rds_json = frame
.as_ref() .as_ref()
.and_then(|f| f.rds.as_ref()) .and_then(|f| f.rds.as_ref())
.and_then(|r| serde_json::to_string(r).ok()); .and_then(|r| serde_json::to_string(r).ok());
self.frame = frame.map(Arc::new); self.frame = frame.map(Arc::new);
} }
/// Returns `(revision, frame, rds_json)`.
/// `rds_json` is pre-serialised; `None` means no RDS data.
pub fn snapshot(&self) -> (u64, Option<Arc<SpectrumData>>, Option<String>) {
(self.revision, self.frame.clone(), self.rds_json.clone())
}
} }
pub type FrontendSpawnFn = fn( pub type FrontendSpawnFn = fn(
@@ -205,8 +202,8 @@ pub struct FrontendRuntimeContext {
pub owner_website_name: Option<String>, pub owner_website_name: Option<String>,
/// Optional base URL used to link AIS vessel names as `<base><mmsi>`. /// Optional base URL used to link AIS vessel names as `<base><mmsi>`.
pub ais_vessel_url_base: Option<String>, pub ais_vessel_url_base: Option<String>,
/// Latest spectrum frame from the active SDR rig; None for non-SDR backends. /// Spectrum sender; SSE clients subscribe via `spectrum.subscribe()`.
pub spectrum: Arc<Mutex<SharedSpectrum>>, pub spectrum: Arc<watch::Sender<SharedSpectrum>>,
} }
impl FrontendRuntimeContext { impl FrontendRuntimeContext {
@@ -245,7 +242,10 @@ impl FrontendRuntimeContext {
owner_website_url: None, owner_website_url: None,
owner_website_name: None, owner_website_name: None,
ais_vessel_url_base: None, ais_vessel_url_base: None,
spectrum: Arc::new(Mutex::new(SharedSpectrum::default())), spectrum: {
let (tx, _rx) = watch::channel(SharedSpectrum::default());
Arc::new(tx)
},
} }
} }
} }