[feat](trx-rs): per-rig watch channels for independent SSE streams

Add per-rig watch::Sender<RigState> map to FrontendRuntimeContext,
populated by refresh_remote_snapshot for every rig returned by GetRigs.
The SSE /events endpoint now subscribes to the session's rig-specific
watch channel instead of the single global one, allowing different
browser tabs to independently view different rigs. The JS frontend
reconnects SSE on rig switch to pick up the new channel.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-21 23:55:55 +01:00
parent f023369c7d
commit f78097b0ed
5 changed files with 57 additions and 7 deletions
+1
View File
@@ -280,6 +280,7 @@ async fn async_init() -> DynResult<AppState> {
token: remote_token,
selected_rig_id: frontend_runtime.remote_active_rig_id.clone(),
known_rigs: frontend_runtime.remote_rigs.clone(),
rig_states: frontend_runtime.rig_states.clone(),
poll_interval: Duration::from_millis(poll_interval_ms),
spectrum: frontend_runtime.spectrum.clone(),
server_connected: frontend_runtime.server_connected.clone(),
+32 -1
View File
@@ -2,7 +2,9 @@
//
// SPDX-License-Identifier: BSD-2-Clause
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::RwLock;
use std::time::Duration;
use std::{sync::Arc, sync::Mutex};
@@ -58,6 +60,7 @@ pub struct RemoteClientConfig {
pub spectrum: Arc<watch::Sender<SharedSpectrum>>,
/// Shared flag: `true` while a TCP connection to trx-server is active.
pub server_connected: Arc<AtomicBool>,
pub rig_states: Arc<RwLock<HashMap<String, watch::Sender<RigState>>>>,
}
pub async fn run_remote_client(
@@ -420,6 +423,31 @@ async fn refresh_remote_snapshot(
true
}
});
// Update per-rig watch channels so each SSE session can subscribe
// to a specific rig's state independently.
if let Ok(mut rig_map) = config.rig_states.write() {
for entry in &rigs {
let new_state = RigState::from_snapshot(entry.state.clone());
if let Some(tx) = rig_map.get(&entry.rig_id) {
tx.send_if_modified(|old| {
if *old == new_state {
false
} else {
*old = new_state;
true
}
});
} else {
let (tx, _rx) = watch::channel(new_state);
rig_map.insert(entry.rig_id.clone(), tx);
}
}
// Remove channels for rigs no longer reported by the server.
let active_ids: std::collections::HashSet<&str> =
rigs.iter().map(|e| e.rig_id.as_str()).collect();
rig_map.retain(|id, _| active_ids.contains(id.as_str()));
}
Ok(())
}
@@ -653,8 +681,9 @@ fn parse_port(port_str: &str) -> Result<u16, String> {
#[cfg(test)]
mod tests {
use super::{parse_remote_url, RemoteClientConfig, RemoteEndpoint, SharedSpectrum};
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
@@ -839,6 +868,7 @@ mod tests {
poll_interval: Duration::from_millis(100),
spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
},
req_rx,
state_tx,
@@ -877,6 +907,7 @@ mod tests {
poll_interval: Duration::from_millis(500),
spectrum: Arc::new(spectrum_tx),
server_connected: Arc::new(AtomicBool::new(false)),
rig_states: Arc::new(RwLock::new(HashMap::new())),
};
let envelope = super::build_envelope(&config, trx_protocol::ClientCommand::GetState, None);
assert_eq!(envelope.token.as_deref(), Some("secret"));
+13
View File
@@ -252,6 +252,10 @@ pub struct FrontendRuntimeContext {
pub remote_active_rig_id: Arc<Mutex<Option<String>>>,
/// Cached remote rig list from GetRigs polling.
pub remote_rigs: Arc<Mutex<Vec<RemoteRigEntry>>>,
/// Per-rig state watch channels, keyed by rig_id.
/// Populated by the remote client poll loop so each SSE session can
/// subscribe to a specific rig's state independently.
pub rig_states: Arc<RwLock<HashMap<String, watch::Sender<RigState>>>>,
/// Owner callsign from trx-client config/CLI for frontend display.
pub owner_callsign: Option<String>,
/// Optional website URL for the web UI header title link.
@@ -281,6 +285,14 @@ pub struct FrontendRuntimeContext {
}
impl FrontendRuntimeContext {
/// Get a watch receiver for a specific rig's state.
pub fn rig_state_rx(&self, rig_id: &str) -> Option<watch::Receiver<RigState>> {
self.rig_states
.read()
.ok()
.and_then(|map| map.get(rig_id).map(|tx| tx.subscribe()))
}
/// Create a new empty runtime context.
pub fn new() -> Self {
Self {
@@ -317,6 +329,7 @@ impl FrontendRuntimeContext {
http_decode_history_retention_min_by_rig: HashMap::new(),
remote_active_rig_id: Arc::new(Mutex::new(None)),
remote_rigs: Arc::new(Mutex::new(Vec::new())),
rig_states: Arc::new(RwLock::new(HashMap::new())),
owner_callsign: None,
owner_website_url: None,
owner_website_name: None,
@@ -3363,12 +3363,12 @@ async function switchRigFromSelect(selectEl) {
updateRigSubtitle(lastActiveRigId);
if (typeof setSchedulerRig === "function") setSchedulerRig(lastActiveRigId);
if (typeof setBackgroundDecodeRig === "function") setBackgroundDecodeRig(lastActiveRigId);
// Also switch the server's active rig so the SSE stream and audio
// follow. Commands already carry rig_id per-tab, but SSE is still
// global until per-session streams are implemented.
// Switch this session's rig and reconnect SSE to the new rig's
// state channel.
try {
const sidParam = sseSessionId ? `&session_id=${encodeURIComponent(sseSessionId)}` : "";
await postPath(`/select_rig?rig_id=${encodeURIComponent(selectEl.value)}${sidParam}`);
connect();
} catch (err) {
console.error("select_rig failed:", err);
}
@@ -335,9 +335,6 @@ pub async fn events(
scheduler_control: web::Data<crate::server::scheduler::SharedSchedulerControlManager>,
session_rig_mgr: web::Data<Arc<SessionRigManager>>,
) -> Result<HttpResponse, Error> {
let rx = state.get_ref().clone();
let initial = wait_for_view(rx.clone()).await?;
let counter = clients.get_ref().clone();
let count = counter.fetch_add(1, Ordering::Relaxed) + 1;
@@ -352,6 +349,14 @@ pub async fn events(
.lock()
.ok()
.and_then(|g| g.clone());
// Subscribe to the per-rig watch channel for this session's rig,
// falling back to the global state watch when unavailable.
let rx = active_rig_id
.as_deref()
.and_then(|rid| context.rig_state_rx(rid))
.unwrap_or_else(|| state.get_ref().clone());
let initial = wait_for_view(rx.clone()).await?;
if let Some(ref rid) = active_rig_id {
session_rig_mgr.register(session_id, rid.clone());
vchan_mgr.init_rig(