[feat](trx-client): per-rig meter supervisor with auto-reconnect

Adds rig_meters: map of per-rig watch::Sender<Option<MeterUpdate>> to
RigRoutingContext with a lazy rig_meter_rx helper. run_meter_supervisor
polls for known short names and spawns one SubscribeMeter TCP connection
per rig; reconnect loop sets TCP_NODELAY and pushes samples into the
per-rig watch so slow SSE readers automatically skip intermediate frames.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-04-19 19:50:14 +02:00
parent b12d93fb3c
commit fd0f1e43c0
4 changed files with 218 additions and 1 deletions
+1
View File
@@ -386,6 +386,7 @@ async fn async_init() -> DynResult<AppState> {
rig_id_to_short_name, rig_id_to_short_name,
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: frontend_runtime.routing.sat_passes.clone(), sat_passes: frontend_runtime.routing.sat_passes.clone(),
rig_meters: frontend_runtime.routing.rig_meters.clone(),
}; };
let state_tx = state_tx.clone(); let state_tx = state_tx.clone();
let remote_shutdown_rx = shutdown_rx.clone(); let remote_shutdown_rx = shutdown_rx.clone();
+191 -1
View File
@@ -20,7 +20,7 @@ use trx_core::{RigError, RigResult};
use trx_frontend::{RemoteRigEntry, SharedSpectrum}; use trx_frontend::{RemoteRigEntry, SharedSpectrum};
use trx_protocol::rig_command_to_client; use trx_protocol::rig_command_to_client;
use trx_protocol::types::RigEntry; use trx_protocol::types::RigEntry;
use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse}; use trx_protocol::{ClientCommand, ClientEnvelope, ClientResponse, MeterUpdate};
const DEFAULT_REMOTE_PORT: u16 = 4530; const DEFAULT_REMOTE_PORT: u16 = 4530;
const DEFAULT_AUDIO_PORT: u16 = 4531; const DEFAULT_AUDIO_PORT: u16 = 4531;
@@ -77,6 +77,9 @@ pub struct RemoteClientConfig {
pub short_name_to_rig_id: Arc<RwLock<HashMap<String, String>>>, pub short_name_to_rig_id: Arc<RwLock<HashMap<String, String>>>,
/// Cached satellite pass predictions from the server (GetSatPasses). /// Cached satellite pass predictions from the server (GetSatPasses).
pub sat_passes: Arc<RwLock<Option<trx_core::geo::PassPredictionResult>>>, pub sat_passes: Arc<RwLock<Option<trx_core::geo::PassPredictionResult>>>,
/// Per-rig meter watch senders, keyed by short name (or rig_id in legacy mode).
/// Populated lazily by the meter-connection supervisor.
pub rig_meters: Arc<RwLock<HashMap<String, watch::Sender<Option<MeterUpdate>>>>>,
} }
pub async fn run_remote_client( pub async fn run_remote_client(
@@ -88,6 +91,12 @@ pub async fn run_remote_client(
// Spectrum polling runs on its own dedicated TCP connection so it never // Spectrum polling runs on its own dedicated TCP connection so it never
// blocks state polls or user commands on the main connection. // blocks state polls or user commands on the main connection.
let spectrum_task = tokio::spawn(run_spectrum_connection(config.clone(), shutdown_rx.clone())); let spectrum_task = tokio::spawn(run_spectrum_connection(config.clone(), shutdown_rx.clone()));
// Meter supervisor: spawns per-rig meter-streaming TCP connections as
// soon as short names are discovered. Runs independently so the meter
// bar in the UI updates at the full server-side 30 Hz without being
// gated on state polls or user commands.
let meter_supervisor =
tokio::spawn(run_meter_supervisor(config.clone(), shutdown_rx.clone()));
let mut reconnect_delay = Duration::from_secs(1); let mut reconnect_delay = Duration::from_secs(1);
@@ -95,6 +104,7 @@ pub async fn run_remote_client(
if *shutdown_rx.borrow() { if *shutdown_rx.borrow() {
info!("Remote client shutting down"); info!("Remote client shutting down");
spectrum_task.abort(); spectrum_task.abort();
meter_supervisor.abort();
return Ok(()); return Ok(());
} }
@@ -159,11 +169,13 @@ pub async fn run_remote_client(
Ok(()) if *shutdown_rx.borrow() => { Ok(()) if *shutdown_rx.borrow() => {
info!("Remote client shutting down"); info!("Remote client shutting down");
spectrum_task.abort(); spectrum_task.abort();
meter_supervisor.abort();
return Ok(()); return Ok(());
} }
Ok(()) => {} Ok(()) => {}
Err(_) => { Err(_) => {
spectrum_task.abort(); spectrum_task.abort();
meter_supervisor.abort();
return Ok(()); return Ok(());
} }
} }
@@ -212,6 +224,177 @@ async fn run_spectrum_connection(
} }
} }
/// Meter stream supervisor. Watches the set of known short names and spawns
/// one dedicated TCP connection per rig that streams `MeterUpdate` JSON lines
/// (see `trx_protocol::MeterUpdate`). Each per-rig task owns its own watch
/// sender in `config.rig_meters` and reconnects on failure.
async fn run_meter_supervisor(
config: RemoteClientConfig,
mut shutdown_rx: watch::Receiver<bool>,
) {
let mut tasks: HashMap<String, tokio::task::JoinHandle<()>> = HashMap::new();
let mut poll = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = poll.tick() => {}
changed = shutdown_rx.changed() => {
if matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow() {
for (_, handle) in tasks.drain() {
handle.abort();
}
return;
}
}
}
let known = collect_known_short_names(&config);
for name in &known {
if tasks.contains_key(name) {
continue;
}
// Ensure a watch sender exists so SSE clients can subscribe
// before the first sample arrives.
{
if let Ok(mut map) = config.rig_meters.write() {
map.entry(name.clone())
.or_insert_with(|| watch::channel(None).0);
}
}
let task = tokio::spawn(run_meter_connection(
config.clone(),
name.clone(),
shutdown_rx.clone(),
));
tasks.insert(name.clone(), task);
}
}
}
fn collect_known_short_names(config: &RemoteClientConfig) -> Vec<String> {
let mut names: Vec<String> = Vec::new();
if has_short_names(config) {
names.extend(config.rig_id_to_short_name.values().cloned());
}
if let Ok(map) = config.rig_states.read() {
for name in map.keys() {
if !names.iter().any(|n| n == name) {
names.push(name.clone());
}
}
}
names
}
/// One dedicated TCP connection per short-name that sends `SubscribeMeter` and
/// pumps the server's `MeterUpdate` JSON stream into the per-rig watch sender.
async fn run_meter_connection(
config: RemoteClientConfig,
short_name: String,
mut shutdown_rx: watch::Receiver<bool>,
) {
loop {
if *shutdown_rx.borrow() {
return;
}
let stream = match time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&config.addr)).await {
Ok(Ok(s)) => s,
Ok(Err(e)) => {
warn!("Meter[{}]: connect failed: {}", short_name, e);
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
return;
}
continue;
}
Err(_) => {
warn!("Meter[{}]: connect timed out", short_name);
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
return;
}
continue;
}
};
let _ = stream.set_nodelay(true);
if let Err(e) = stream_meter(&config, &short_name, stream, &mut shutdown_rx).await {
warn!("Meter[{}]: stream ended: {}", short_name, e);
}
if wait_reconnect(&mut shutdown_rx, Duration::from_secs(1)).await {
return;
}
}
}
async fn wait_reconnect(shutdown_rx: &mut watch::Receiver<bool>, delay: Duration) -> bool {
tokio::select! {
_ = time::sleep(delay) => false,
changed = shutdown_rx.changed() => {
matches!(changed, Ok(()) | Err(_)) && *shutdown_rx.borrow()
}
}
}
async fn stream_meter(
config: &RemoteClientConfig,
short_name: &str,
stream: TcpStream,
shutdown_rx: &mut watch::Receiver<bool>,
) -> RigResult<()> {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let envelope = build_envelope(config, ClientCommand::SubscribeMeter, Some(short_name.to_string()));
let mut payload = serde_json::to_string(&envelope)
.map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?;
payload.push('\n');
time::timeout(IO_TIMEOUT, writer.write_all(payload.as_bytes()))
.await
.map_err(|_| RigError::communication("meter subscribe write timed out".to_string()))?
.map_err(|e| RigError::communication(format!("meter subscribe write failed: {e}")))?;
time::timeout(IO_TIMEOUT, writer.flush())
.await
.map_err(|_| RigError::communication("meter subscribe flush timed out".to_string()))?
.map_err(|e| RigError::communication(format!("meter subscribe flush failed: {e}")))?;
loop {
tokio::select! {
changed = shutdown_rx.changed() => {
match changed {
Ok(()) if *shutdown_rx.borrow() => return Ok(()),
Ok(()) => {}
Err(_) => return Ok(()),
}
}
line = read_limited_line(&mut reader, MAX_JSON_LINE_BYTES) => {
let line = line
.map_err(|e| RigError::communication(format!("meter read failed: {e}")))?
.ok_or_else(|| RigError::communication("meter connection closed".to_string()))?;
let trimmed = line.trim_end();
if trimmed.is_empty() {
continue;
}
let update: MeterUpdate = match serde_json::from_str(trimmed) {
Ok(u) => u,
Err(e) => {
warn!("Meter[{}]: bad frame: {}", short_name, e);
continue;
}
};
if let Ok(map) = config.rig_meters.read() {
if let Some(tx) = map.get(short_name) {
// `watch` keeps only the latest value; slow SSE
// readers simply skip intermediate samples, which is
// exactly the desired behaviour for a meter display.
let _ = tx.send(Some(update));
}
}
}
}
}
}
/// Satellite pass prediction refresh runs on a dedicated TCP connection so it /// Satellite pass prediction refresh runs on a dedicated TCP connection so it
/// never blocks state polls or user commands on the main connection. /// never blocks state polls or user commands on the main connection.
/// Fetches immediately on connect, then every 5 minutes. /// Fetches immediately on connect, then every 5 minutes.
@@ -1300,6 +1483,7 @@ mod tests {
rig_id_to_short_name: HashMap::new(), rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)), sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
}, },
req_rx, req_rx,
state_tx, state_tx,
@@ -1344,6 +1528,7 @@ mod tests {
rig_id_to_short_name: HashMap::new(), rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)), sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
}; };
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None); let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
assert_eq!(envelope.token.as_deref(), Some("secret")); assert_eq!(envelope.token.as_deref(), Some("secret"));
@@ -1371,6 +1556,7 @@ mod tests {
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]), rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]),
short_name_to_rig_id, short_name_to_rig_id,
sat_passes: Arc::new(RwLock::new(None)), sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
}; };
// selected_rig_id is "home-hf" (short name), envelope should translate to "hf" // selected_rig_id is "home-hf" (short name), envelope should translate to "hf"
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None); let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
@@ -1402,6 +1588,7 @@ mod tests {
rig_id_to_short_name: HashMap::new(), rig_id_to_short_name: HashMap::new(),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)), sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
}; };
// Legacy mode: rig_id passes through unchanged // Legacy mode: rig_id passes through unchanged
assert!(!has_short_names(&config)); assert!(!has_short_names(&config));
@@ -1428,6 +1615,7 @@ mod tests {
]), ]),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)), sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
}; };
assert!(has_short_names(&config)); assert!(has_short_names(&config));
assert_eq!( assert_eq!(
@@ -1464,6 +1652,7 @@ mod tests {
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]), rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)), sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
}; };
let snapshot = sample_snapshot(); let snapshot = sample_snapshot();
let rigs = vec![RigEntry { let rigs = vec![RigEntry {
@@ -1536,6 +1725,7 @@ mod tests {
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]), rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
sat_passes: Arc::new(RwLock::new(None)), sat_passes: Arc::new(RwLock::new(None)),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
}; };
let ids = super::active_spectrum_rig_ids(&config); let ids = super::active_spectrum_rig_ids(&config);
+1
View File
@@ -12,4 +12,5 @@ bytes = "1"
uuid = { workspace = true } uuid = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
trx-core = { path = "../../trx-core" } trx-core = { path = "../../trx-core" }
trx-protocol = { path = "../../trx-protocol" }
tokio = { workspace = true, features = ["sync"] } tokio = { workspace = true, features = ["sync"] }
+25
View File
@@ -22,6 +22,7 @@ use trx_core::decode::{
}; };
use trx_core::rig::state::{RigSnapshot, SpectrumData}; use trx_core::rig::state::{RigSnapshot, SpectrumData};
use trx_core::{DynResult, RigRequest, RigState}; use trx_core::{DynResult, RigRequest, RigState};
use trx_protocol::MeterUpdate;
/// Shared, timestamped decode history for a single decoder type. /// Shared, timestamped decode history for a single decoder type.
/// ///
@@ -320,6 +321,10 @@ pub struct RigRoutingContext {
pub server_connected: Arc<AtomicBool>, pub server_connected: Arc<AtomicBool>,
/// Per-rig server connection state. /// Per-rig server connection state.
pub rig_server_connected: Arc<RwLock<HashMap<String, bool>>>, pub rig_server_connected: Arc<RwLock<HashMap<String, bool>>>,
/// Per-rig meter watch channels, keyed by rig_id. Populated lazily by
/// the meter-connection supervisor in `trx-client`; `None` on the sender
/// side means "no sample yet".
pub rig_meters: Arc<RwLock<HashMap<String, watch::Sender<Option<MeterUpdate>>>>>,
} }
impl Default for RigRoutingContext { impl Default for RigRoutingContext {
@@ -331,6 +336,7 @@ impl Default for RigRoutingContext {
rig_states: Arc::new(RwLock::new(HashMap::new())), rig_states: Arc::new(RwLock::new(HashMap::new())),
server_connected: Arc::new(AtomicBool::new(false)), server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())), rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
rig_meters: Arc::new(RwLock::new(HashMap::new())),
} }
} }
} }
@@ -447,6 +453,25 @@ impl FrontendRuntimeContext {
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe())) .and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
} }
/// Get a watch receiver for a specific rig's meter stream.
/// Lazily inserts a new channel if the rig_id is not yet present so
/// SSE clients can subscribe before the meter-connection supervisor
/// has produced a first sample.
pub fn rig_meter_rx(&self, rig_id: &str) -> watch::Receiver<Option<MeterUpdate>> {
if let Ok(map) = self.routing.rig_meters.read() {
if let Some(tx) = map.get(rig_id) {
return tx.subscribe();
}
}
if let Ok(mut map) = self.routing.rig_meters.write() {
map.entry(rig_id.to_string())
.or_insert_with(|| watch::channel(None).0)
.subscribe()
} else {
watch::channel(None).1
}
}
/// Get a watch receiver for a specific rig's spectrum. /// Get a watch receiver for a specific rig's spectrum.
/// Lazily inserts a new channel if the rig_id is not yet present. /// Lazily inserts a new channel if the rig_id is not yet present.
pub fn rig_spectrum_rx(&self, rig_id: &str) -> watch::Receiver<SharedSpectrum> { pub fn rig_spectrum_rx(&self, rig_id: &str) -> watch::Receiver<SharedSpectrum> {