[feat](trx-client): freeze only the disconnected rig's view in multi-rig mode

Track per-rig server connection state in `rig_server_connected` so that when
one trx-server drops, only the rig(s) it serves are marked disconnected. Other
rigs with active connections remain fully interactive. The SSE `server_connected`
field is now resolved from the per-rig map for the session's active rig, falling
back to the global flag for backward compatibility.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-26 23:22:07 +01:00
parent ba2fbed7c3
commit bb5beb79da
4 changed files with 73 additions and 11 deletions
+1
View File
@@ -380,6 +380,7 @@ async fn async_init() -> DynResult<AppState> {
spectrum: frontend_runtime.spectrum.clone(), spectrum: frontend_runtime.spectrum.clone(),
rig_spectrums: frontend_runtime.rig_spectrums.clone(), rig_spectrums: frontend_runtime.rig_spectrums.clone(),
server_connected: frontend_runtime.server_connected.clone(), server_connected: frontend_runtime.server_connected.clone(),
rig_server_connected: frontend_runtime.rig_server_connected.clone(),
rig_id_to_short_name, rig_id_to_short_name,
short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())), short_name_to_rig_id: Arc::new(RwLock::new(HashMap::new())),
}; };
+44 -1
View File
@@ -61,6 +61,11 @@ pub struct RemoteClientConfig {
pub spectrum: Arc<watch::Sender<SharedSpectrum>>, pub spectrum: Arc<watch::Sender<SharedSpectrum>>,
/// Shared flag: `true` while a TCP connection to trx-server is active. /// Shared flag: `true` while a TCP connection to trx-server is active.
pub server_connected: Arc<AtomicBool>, pub server_connected: Arc<AtomicBool>,
/// Per-rig server connection flag. Keyed by short name (or rig_id in legacy mode).
/// Set to `true` once the rig appears in a successful GetRigs response, and to
/// `false` when this config's TCP connection drops. Allows the UI to freeze only
/// the affected rig's view rather than all rigs.
pub rig_server_connected: Arc<RwLock<HashMap<String, bool>>>,
pub rig_states: Arc<RwLock<HashMap<String, watch::Sender<RigState>>>>, pub rig_states: Arc<RwLock<HashMap<String, watch::Sender<RigState>>>>,
/// Per-rig spectrum watch senders, keyed by short name (or rig_id in legacy mode). /// Per-rig spectrum watch senders, keyed by short name (or rig_id in legacy mode).
pub rig_spectrums: Arc<RwLock<HashMap<String, watch::Sender<SharedSpectrum>>>>, pub rig_spectrums: Arc<RwLock<HashMap<String, watch::Sender<SharedSpectrum>>>>,
@@ -109,7 +114,32 @@ pub async fn run_remote_client(
warn!("Remote connection dropped: {}", e); warn!("Remote connection dropped: {}", e);
} }
config.server_connected.store(false, Ordering::Relaxed); config.server_connected.store(false, Ordering::Relaxed);
// Nudge the state watch so SSE clients see server_connected=false. // Collect the short names owned by this config so we can mark
// only their rigs as disconnected (other server groups unaffected).
let owned: Vec<String> = if has_short_names(&config) {
config.rig_id_to_short_name.values().cloned().collect()
} else {
// Legacy single-remote: every key in rig_server_connected is ours.
config
.rig_server_connected
.read()
.map(|m| m.keys().cloned().collect())
.unwrap_or_default()
};
if let Ok(mut conn_map) = config.rig_server_connected.write() {
for name in &owned {
conn_map.insert(name.clone(), false);
}
}
// Nudge each rig's watch so SSE clients see server_connected=false.
if let Ok(rig_map) = config.rig_states.read() {
for name in &owned {
if let Some(tx) = rig_map.get(name) {
tx.send_modify(|_| {});
}
}
}
// Also nudge the global state watch for backward compat.
state_tx.send_modify(|_| {}); state_tx.send_modify(|_| {});
} }
Ok(Err(e)) => { Ok(Err(e)) => {
@@ -514,6 +544,12 @@ async fn refresh_remote_snapshot(
!config_owns_short_name(config, id) || active_keys.contains(id.as_str()) !config_owns_short_name(config, id) || active_keys.contains(id.as_str())
}); });
} }
// Mark all mapped rigs as connected now that we have a live snapshot.
if let Ok(mut conn_map) = config.rig_server_connected.write() {
for (key, _) in &mapped_rigs {
conn_map.insert(key.clone(), true);
}
}
Ok(()) Ok(())
} }
@@ -1138,6 +1174,7 @@ mod tests {
poll_interval: Duration::from_millis(100), poll_interval: Duration::from_millis(100),
spectrum: Arc::new(spectrum_tx), spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)), server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
rig_states: Arc::new(RwLock::new(HashMap::new())), rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::new(), rig_id_to_short_name: HashMap::new(),
@@ -1180,6 +1217,7 @@ mod tests {
poll_interval: Duration::from_millis(500), poll_interval: Duration::from_millis(500),
spectrum: Arc::new(spectrum_tx), spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)), server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
rig_states: Arc::new(RwLock::new(HashMap::new())), rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::new(), rig_id_to_short_name: HashMap::new(),
@@ -1205,6 +1243,7 @@ mod tests {
poll_interval: Duration::from_millis(500), poll_interval: Duration::from_millis(500),
spectrum: Arc::new(spectrum_tx), spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)), server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
rig_states: Arc::new(RwLock::new(HashMap::new())), rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]), rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "home-hf".to_string())]),
@@ -1234,6 +1273,7 @@ mod tests {
poll_interval: Duration::from_millis(500), poll_interval: Duration::from_millis(500),
spectrum: Arc::new(spectrum_tx), spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)), server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
rig_states: Arc::new(RwLock::new(HashMap::new())), rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::new(), rig_id_to_short_name: HashMap::new(),
@@ -1255,6 +1295,7 @@ mod tests {
poll_interval: Duration::from_millis(500), poll_interval: Duration::from_millis(500),
spectrum: Arc::new(spectrum_tx), spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)), server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
rig_states: Arc::new(RwLock::new(HashMap::new())), rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::from([ rig_id_to_short_name: HashMap::from([
@@ -1292,6 +1333,7 @@ mod tests {
poll_interval: Duration::from_millis(500), poll_interval: Duration::from_millis(500),
spectrum: Arc::new(spectrum_tx), spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)), server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
rig_states: Arc::new(RwLock::new(HashMap::new())), rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]), rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
@@ -1362,6 +1404,7 @@ mod tests {
poll_interval: Duration::from_millis(500), poll_interval: Duration::from_millis(500),
spectrum: Arc::new(spectrum_tx), spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)), server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
rig_states: Arc::new(RwLock::new(HashMap::new())), rig_states: Arc::new(RwLock::new(HashMap::new())),
rig_spectrums: Arc::new(RwLock::new(HashMap::new())), rig_spectrums: Arc::new(RwLock::new(HashMap::new())),
rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]), rig_id_to_short_name: HashMap::from([(Some("hf".to_string()), "gdansk".to_string())]),
+5
View File
@@ -301,6 +301,10 @@ pub struct FrontendRuntimeContext {
/// Whether the remote client currently has an active TCP connection to /// Whether the remote client currently has an active TCP connection to
/// trx-server. Set to `true` on successful connect, `false` on drop. /// trx-server. Set to `true` on successful connect, `false` on drop.
pub server_connected: Arc<AtomicBool>, pub server_connected: Arc<AtomicBool>,
/// Per-rig server connection state, keyed by short name (or rig_id in legacy mode).
/// `true` while the rig's trx-server connection is active.
/// Allows the UI to freeze only the rig that lost its connection.
pub rig_server_connected: Arc<RwLock<HashMap<String, bool>>>,
} }
impl FrontendRuntimeContext { impl FrontendRuntimeContext {
@@ -404,6 +408,7 @@ impl FrontendRuntimeContext {
vchan_audio_cmd: Arc::new(Mutex::new(None)), vchan_audio_cmd: Arc::new(Mutex::new(None)),
vchan_destroyed: None, vchan_destroyed: None,
server_connected: Arc::new(AtomicBool::new(false)), server_connected: Arc::new(AtomicBool::new(false)),
rig_server_connected: Arc::new(RwLock::new(HashMap::new())),
} }
} }
} }
@@ -187,7 +187,11 @@ pub async fn status_api(
let json = serde_json::to_string(&state).map_err(actix_web::error::ErrorInternalServerError)?; let json = serde_json::to_string(&state).map_err(actix_web::error::ErrorInternalServerError)?;
let json = inject_frontend_meta( let json = inject_frontend_meta(
&json, &json,
frontend_meta_from_context(clients.load(Ordering::Relaxed), context.get_ref().as_ref()), frontend_meta_from_context(
clients.load(Ordering::Relaxed),
context.get_ref().as_ref(),
None,
),
); );
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "application/json")) .insert_header((header::CONTENT_TYPE, "application/json"))
@@ -214,7 +218,19 @@ fn inject_frontend_meta(json: &str, meta: FrontendMeta) -> String {
fn frontend_meta_from_context( fn frontend_meta_from_context(
http_clients: usize, http_clients: usize,
context: &FrontendRuntimeContext, context: &FrontendRuntimeContext,
rig_id: Option<&str>,
) -> FrontendMeta { ) -> FrontendMeta {
// Use per-rig connection state when available so that only the rig whose
// server dropped appears disconnected, leaving other rigs unaffected.
let server_connected = rig_id
.and_then(|rid| {
context
.rig_server_connected
.read()
.ok()
.and_then(|m| m.get(rid).copied())
})
.unwrap_or_else(|| context.server_connected.load(Ordering::Relaxed));
FrontendMeta { FrontendMeta {
http_clients, http_clients,
rigctl_clients: context.rigctl_clients.load(Ordering::Relaxed), rigctl_clients: context.rigctl_clients.load(Ordering::Relaxed),
@@ -231,7 +247,7 @@ fn frontend_meta_from_context(
spectrum_coverage_margin_hz: spectrum_coverage_margin_hz_from_context(context), spectrum_coverage_margin_hz: spectrum_coverage_margin_hz_from_context(context),
spectrum_usable_span_ratio: spectrum_usable_span_ratio_from_context(context), spectrum_usable_span_ratio: spectrum_usable_span_ratio_from_context(context),
decode_history_retention_min: decode_history_retention_min_from_context(context), decode_history_retention_min: decode_history_retention_min_from_context(context),
server_connected: context.server_connected.load(Ordering::Relaxed), server_connected,
} }
} }
@@ -375,7 +391,7 @@ pub async fn events(
serde_json::to_string(&initial).map_err(actix_web::error::ErrorInternalServerError)?; serde_json::to_string(&initial).map_err(actix_web::error::ErrorInternalServerError)?;
let initial_json = inject_frontend_meta( let initial_json = inject_frontend_meta(
&initial_json, &initial_json,
frontend_meta_from_context(count, context.get_ref().as_ref()), frontend_meta_from_context(count, context.get_ref().as_ref(), active_rig_id.as_deref()),
); );
let mut prefix: Vec<Result<Bytes, Error>> = Vec::new(); let mut prefix: Vec<Result<Bytes, Error>> = Vec::new();
@@ -418,18 +434,14 @@ pub async fn events(
.ok() .ok()
.and_then(|g| g.clone()) .and_then(|g| g.clone())
}); });
if let Some(rig_id) = rig_id_opt { if let Some(ref rig_id) = rig_id_opt {
vchan.update_primary( vchan.update_primary(rig_id, v.status.freq.hz, &format!("{:?}", v.status.mode));
&rig_id,
v.status.freq.hz,
&format!("{:?}", v.status.mode),
);
sync_scheduler_vchannels( sync_scheduler_vchannels(
vchan.as_ref(), vchan.as_ref(),
bookmark_store_map.as_ref(), bookmark_store_map.as_ref(),
&scheduler_status, &scheduler_status,
scheduler_control.as_ref(), scheduler_control.as_ref(),
&rig_id, rig_id,
); );
} }
serde_json::to_string(&v).ok().map(|json| { serde_json::to_string(&v).ok().map(|json| {
@@ -438,6 +450,7 @@ pub async fn events(
frontend_meta_from_context( frontend_meta_from_context(
counter.load(Ordering::Relaxed), counter.load(Ordering::Relaxed),
context.as_ref(), context.as_ref(),
rig_id_opt.as_deref(),
), ),
); );
Ok::<Bytes, Error>(Bytes::from(format!("data: {json}\n\n"))) Ok::<Bytes, Error>(Bytes::from(format!("data: {json}\n\n")))