From fd0f1e43c04c2352930696ef722cfaf6ab73a308 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sun, 19 Apr 2026 19:50:14 +0200 Subject: [PATCH] [feat](trx-client): per-rig meter supervisor with auto-reconnect Adds rig_meters: map of per-rig watch::Sender> to RigRoutingContext with a lazy rig_meter_rx helper. run_meter_supervisor polls for known short names and spawns one SubscribeMeter TCP connection per rig; reconnect loop sets TCP_NODELAY and pushes samples into the per-rig watch so slow SSE readers automatically skip intermediate frames. Co-Authored-By: Claude Opus 4.7 Signed-off-by: Stan Grams --- src/trx-client/src/main.rs | 1 + src/trx-client/src/remote_client.rs | 192 ++++++++++++++++++++++++- src/trx-client/trx-frontend/Cargo.toml | 1 + src/trx-client/trx-frontend/src/lib.rs | 25 ++++ 4 files changed, 218 insertions(+), 1 deletion(-) diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index 8792569..74a836e 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -386,6 +386,7 @@ async fn async_init() -> DynResult { rig_id_to_short_name, short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), sat_passes: frontend_runtime.routing.sat_passes.clone(), + rig_meters: frontend_runtime.routing.rig_meters.clone(), }; let state_tx = state_tx.clone(); let remote_shutdown_rx = shutdown_rx.clone(); diff --git a/src/trx-client/src/remote_client.rs b/src/trx-client/src/remote_client.rs index 23a9b3d..d919f84 100644 --- a/src/trx-client/src/remote_client.rs +++ b/src/trx-client/src/remote_client.rs @@ -20,7 +20,7 @@ use trx_core::{RigError, RigResult}; use trx_frontend::{RemoteRigEntry, SharedSpectrum}; use trx_protocol::rig_command_to_client; use trx_protocol::types::RigEntry; -use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse}; +use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse, MeterUpdate}; const DEFAULT_REMOTE_PORT: u16 = 4530; const DEFAULT_AUDIO_PORT: u16 = 4531; @@ -77,6 +77,9 @@ pub struct RemoteClientConfig { pub short_name_to_rig_id: Arc>>, /// Cached satellite pass predictions from the server (GetSatPasses). pub sat_passes: Arc>>, + /// Per-rig meter watch senders, keyed by short name (or rig_id in legacy mode). + /// Populated lazily by the meter-connection supervisor. + pub rig_meters: Arc>>>>, } pub async fn run_remote_client( @@ -88,6 +91,12 @@ pub async fn run_remote_client( // Spectrum polling runs on its own dedicated TCP connection so it never // blocks state polls or user commands on the main connection. let spectrum_task = tokio::spawn(run_spectrum_connection(config.clone(), shutdown_rx.clone())); + // Meter supervisor: spawns per-rig meter-streaming TCP connections as + // soon as short names are discovered. Runs independently so the meter + // bar in the UI updates at the full server-side 30 Hz without being + // gated on state polls or user commands. + let meter_supervisor = + tokio::spawn(run_meter_supervisor(config.clone(), shutdown_rx.clone())); let mut reconnect_delay = Duration::from_secs(1); @@ -95,6 +104,7 @@ pub async fn run_remote_client( if *shutdown_rx.borrow() { info!("Remote client shutting down"); spectrum_task.abort(); + meter_supervisor.abort(); return Ok(()); } @@ -159,11 +169,13 @@ pub async fn run_remote_client( Ok(()) if *shutdown_rx.borrow() => { info!("Remote client shutting down"); spectrum_task.abort(); + meter_supervisor.abort(); return Ok(()); } Ok(()) => {} Err(_) => { spectrum_task.abort(); + meter_supervisor.abort(); return Ok(()); } } @@ -212,6 +224,177 @@ async fn run_spectrum_connection( } } +/// Meter stream supervisor. Watches the set of known short names and spawns +/// one dedicated TCP connection per rig that streams `MeterUpdate` JSON lines +/// (see `trx_protocol::MeterUpdate`). Each per-rig task owns its own watch +/// sender in `config.rig_meters` and reconnects on failure. +async fn run_meter_supervisor( + config: RemoteClientConfig, + mut shutdown_rx: watch::Receiver, +) { + let mut tasks: HashMap> = HashMap::new(); + let mut poll = time::interval(Duration::from_millis(500)); + + loop { + tokio::select! { + _ = poll.tick() => {} + changed = shutdown_rx.changed() => { + if matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() { + for (_, handle) in tasks.drain() { + handle.abort(); + } + return; + } + } + } + + let known = collect_known_short_names(&config); + for name in &known { + if tasks.contains_key(name) { + continue; + } + // Ensure a watch sender exists so SSE clients can subscribe + // before the first sample arrives. + { + if let Ok(mut map) = config.rig_meters.write() { + map.entry(name.clone()) + .or_insert_with(|| watch::channel(None).0); + } + } + let task = tokio::spawn(run_meter_connection( + config.clone(), + name.clone(), + shutdown_rx.clone(), + )); + tasks.insert(name.clone(), task); + } + } +} + +fn collect_known_short_names(config: &RemoteClientConfig) -> Vec { + let mut names: Vec = Vec::new(); + if has_short_names(config) { + names.extend(config.rig_id_to_short_name.values().cloned()); + } + if let Ok(map) = config.rig_states.read() { + for name in map.keys() { + if !names.iter().any(|n| n == name) { + names.push(name.clone()); + } + } + } + names +} + +/// One dedicated TCP connection per short-name that sends `SubscribeMeter` and +/// pumps the server's `MeterUpdate` JSON stream into the per-rig watch sender. +async fn run_meter_connection( + config: RemoteClientConfig, + short_name: String, + mut shutdown_rx: watch::Receiver, +) { + loop { + if *shutdown_rx.borrow() { + return; + } + + let stream = match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await { + Ok(Ok(s)) => s, + Ok(Err(e)) => { + warn!("Meter[{}]: connect failed: {}", short_name, e); + if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await { + return; + } + continue; + } + Err(_) => { + warn!("Meter[{}]: connect timed out", short_name); + if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await { + return; + } + continue; + } + }; + let _ = stream.set_nodelay(true); + + if let Err(e) = stream_meter(&config, &short_name, stream, &mut shutdown_rx).await { + warn!("Meter[{}]: stream ended: {}", short_name, e); + } + + if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await { + return; + } + } +} + +async fn wait_reconnect(shutdown_rx: &mut watch::Receiver, delay: Duration) -> bool { + tokio::select! { + _ = time::sleep(delay) => false, + changed = shutdown_rx.changed() => { + matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() + } + } +} + +async fn stream_meter( + config: &RemoteClientConfig, + short_name: &str, + stream: TcpStream, + shutdown_rx: &mut watch::Receiver, +) -> RigResult<()> { + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + + let envelope = build_envelope(config, ClientCommand::SubscribeMeter, Some(short_name.to_string())); + let mut payload = serde_json::to_string(&envelope) + .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; + payload.push('\n'); + time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes())) + .await + .map_err(|_| RigError::communication("meter subscribe write timed out".to_string()))? + .map_err(|e| RigError::communication(format!("meter subscribe write failed: {e}")))?; + time::timeout(IO_TIMEOUT, writer.flush()) + .await + .map_err(|_| RigError::communication("meter subscribe flush timed out".to_string()))? + .map_err(|e| RigError::communication(format!("meter subscribe flush failed: {e}")))?; + + loop { + tokio::select! { + changed = shutdown_rx.changed() => { + match changed { + Ok(()) if *shutdown_rx.borrow() => return Ok(()), + Ok(()) => {} + Err(_) => return Ok(()), + } + } + line = read_limited_line(&mut reader, MAX_JSON_LINE_BYTES) => { + let line = line + .map_err(|e| RigError::communication(format!("meter read failed: {e}")))? + .ok_or_else(|| RigError::communication("meter connection closed".to_string()))?; + let trimmed = line.trim_end(); + if trimmed.is_empty() { + continue; + } + let update: MeterUpdate = match serde_json::from_str(trimmed) { + Ok(u) => u, + Err(e) => { + warn!("Meter[{}]: bad frame: {}", short_name, e); + continue; + } + }; + if let Ok(map) = config.rig_meters.read() { + if let Some(tx) = map.get(short_name) { + // `watch` keeps only the latest value; slow SSE + // readers simply skip intermediate samples, which is + // exactly the desired behaviour for a meter display. + let _ = tx.send(Some(update)); + } + } + } + } + } +} + /// Satellite pass prediction refresh runs on a dedicated TCP connection so it /// never blocks state polls or user commands on the main connection. /// Fetches immediately on connect, then every 5 minutes. @@ -1300,6 +1483,7 @@ mod tests { rig_id_to_short_name: HashMap::new(), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), sat_passes: Arc::new(RwLock::new(None)), + rig_meters: Arc::new(RwLock::new(HashMap::new())), }, req_rx, state_tx, @@ -1344,6 +1528,7 @@ mod tests { rig_id_to_short_name: HashMap::new(), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), sat_passes: Arc::new(RwLock::new(None)), + rig_meters: Arc::new(RwLock::new(HashMap::new())), }; let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None); assert_eq!(envelope.token.as_deref(), Some("secret")); @@ -1371,6 +1556,7 @@ mod tests { rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]), short_name_to_rig_id, sat_passes: Arc::new(RwLock::new(None)), + rig_meters: Arc::new(RwLock::new(HashMap::new())), }; // selected_rig_id is "home-hf" (short name), envelope should translate to "hf" let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None); @@ -1402,6 +1588,7 @@ mod tests { rig_id_to_short_name: HashMap::new(), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), sat_passes: Arc::new(RwLock::new(None)), + rig_meters: Arc::new(RwLock::new(HashMap::new())), }; // Legacy mode: rig_id passes through unchanged assert!(!has_short_names(&config)); @@ -1428,6 +1615,7 @@ mod tests { ]), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), sat_passes: Arc::new(RwLock::new(None)), + rig_meters: Arc::new(RwLock::new(HashMap::new())), }; assert!(has_short_names(&config)); assert_eq!( @@ -1464,6 +1652,7 @@ mod tests { rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), sat_passes: Arc::new(RwLock::new(None)), + rig_meters: Arc::new(RwLock::new(HashMap::new())), }; let snapshot = sample_snapshot(); let rigs = vec![RigEntry { @@ -1536,6 +1725,7 @@ mod tests { rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), sat_passes: Arc::new(RwLock::new(None)), + rig_meters: Arc::new(RwLock::new(HashMap::new())), }; let ids = super::active_spectrum_rig_ids(&config); diff --git a/src/trx-client/trx-frontend/Cargo.toml b/src/trx-client/trx-frontend/Cargo.toml index 550fb55..7fe3869 100644 --- a/src/trx-client/trx-frontend/Cargo.toml +++ b/src/trx-client/trx-frontend/Cargo.toml @@ -12,4 +12,5 @@ bytes = "1" uuid = { workspace = true } serde_json = { workspace = true } trx-core = { path = "../../trx-core" } +trx-protocol = { path = "../../trx-protocol" } tokio = { workspace = true, features = ["sync"] } diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index 7745e2c..3968d0a 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -22,6 +22,7 @@ use trx_core::decode::{ }; use trx_core::rig::state::{RigSnapshot, SpectrumData}; use trx_core::{DynResult, RigRequest, RigState}; +use trx_protocol::MeterUpdate; /// Shared, timestamped decode history for a single decoder type. /// @@ -320,6 +321,10 @@ pub struct RigRoutingContext { pub server_connected: Arc, /// Per-rig server connection state. pub rig_server_connected: Arc>>, + /// Per-rig meter watch channels, keyed by rig_id. Populated lazily by + /// the meter-connection supervisor in `trx-client`; `None` on the sender + /// side means "no sample yet". + pub rig_meters: Arc>>>>, } impl Default for RigRoutingContext { @@ -331,6 +336,7 @@ impl Default for RigRoutingContext { rig_states: Arc::new(RwLock::new(HashMap::new())), server_connected: Arc::new(AtomicBool::new(false)), rig_server_connected: Arc::new(RwLock::new(HashMap::new())), + rig_meters: Arc::new(RwLock::new(HashMap::new())), } } } @@ -447,6 +453,25 @@ impl FrontendRuntimeContext { .and_then(|map| map.get(rig_id).map(|tx| tx.subscribe())) } + /// Get a watch receiver for a specific rig's meter stream. + /// Lazily inserts a new channel if the rig_id is not yet present so + /// SSE clients can subscribe before the meter-connection supervisor + /// has produced a first sample. + pub fn rig_meter_rx(&self, rig_id: &str) -> watch::Receiver> { + if let Ok(map) = self.routing.rig_meters.read() { + if let Some(tx) = map.get(rig_id) { + return tx.subscribe(); + } + } + if let Ok(mut map) = self.routing.rig_meters.write() { + map.entry(rig_id.to_string()) + .or_insert_with(|| watch::channel(None).0) + .subscribe() + } else { + watch::channel(None).1 + } + } + /// Get a watch receiver for a specific rig's spectrum. /// Lazily inserts a new channel if the rig_id is not yet present. pub fn rig_spectrum_rx(&self, rig_id: &str) -> watch::Receiver {