[feat](trx-frontend-http): per-session SSE rig selection
Add SessionRigManager to track per-SSE-session rig_id so different browser tabs can independently select rigs without interfering. The /events SSE stream filters state updates by session rig (falling back to the global active rig), and /select_rig accepts an optional session_id to update the per-session mapping. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -599,6 +599,7 @@ let sdrSquelchSupported = false;
|
|||||||
let lastRigIds = [];
|
let lastRigIds = [];
|
||||||
let lastRigDisplayNames = {};
|
let lastRigDisplayNames = {};
|
||||||
let lastActiveRigId = null;
|
let lastActiveRigId = null;
|
||||||
|
let sseSessionId = null;
|
||||||
const originalTitle = document.title;
|
const originalTitle = document.title;
|
||||||
const savedTheme = loadSetting("theme", null);
|
const savedTheme = loadSetting("theme", null);
|
||||||
|
|
||||||
@@ -3176,6 +3177,7 @@ async function pollFreshSnapshot() {
|
|||||||
function connect() {
|
function connect() {
|
||||||
if (es) {
|
if (es) {
|
||||||
es.close();
|
es.close();
|
||||||
|
sseSessionId = null;
|
||||||
}
|
}
|
||||||
if (esHeartbeat) {
|
if (esHeartbeat) {
|
||||||
clearInterval(esHeartbeat);
|
clearInterval(esHeartbeat);
|
||||||
@@ -3210,6 +3212,10 @@ function connect() {
|
|||||||
lastEventAt = Date.now();
|
lastEventAt = Date.now();
|
||||||
});
|
});
|
||||||
es.addEventListener("session", evt => {
|
es.addEventListener("session", evt => {
|
||||||
|
try {
|
||||||
|
const d = JSON.parse(evt.data);
|
||||||
|
sseSessionId = d.session_id || null;
|
||||||
|
} catch (_) {}
|
||||||
if (typeof vchanHandleSession === "function") vchanHandleSession(evt.data);
|
if (typeof vchanHandleSession === "function") vchanHandleSession(evt.data);
|
||||||
});
|
});
|
||||||
es.addEventListener("channels", evt => {
|
es.addEventListener("channels", evt => {
|
||||||
@@ -3361,7 +3367,8 @@ async function switchRigFromSelect(selectEl) {
|
|||||||
// follow. Commands already carry rig_id per-tab, but SSE is still
|
// follow. Commands already carry rig_id per-tab, but SSE is still
|
||||||
// global until per-session streams are implemented.
|
// global until per-session streams are implemented.
|
||||||
try {
|
try {
|
||||||
await postPath(`/select_rig?rig_id=${encodeURIComponent(selectEl.value)}`);
|
const sidParam = sseSessionId ? `&session_id=${encodeURIComponent(sseSessionId)}` : "";
|
||||||
|
await postPath(`/select_rig?rig_id=${encodeURIComponent(selectEl.value)}${sidParam}`);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("select_rig failed:", err);
|
console.error("select_rig failed:", err);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
//
|
//
|
||||||
// SPDX-License-Identifier: BSD-2-Clause
|
// SPDX-License-Identifier: BSD-2-Clause
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -97,6 +98,43 @@ struct FrontendMeta {
|
|||||||
server_connected: bool,
|
server_connected: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tracks per-SSE-session rig selection so different browser tabs can
|
||||||
|
/// independently view different rigs without interfering.
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct SessionRigManager {
|
||||||
|
/// Maps SSE session UUID → selected rig_id.
|
||||||
|
sessions: std::sync::RwLock<HashMap<Uuid, String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionRigManager {
|
||||||
|
pub fn register(&self, session_id: Uuid, rig_id: String) {
|
||||||
|
if let Ok(mut sessions) = self.sessions.write() {
|
||||||
|
sessions.insert(session_id, rig_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unregister(&self, session_id: Uuid) {
|
||||||
|
if let Ok(mut sessions) = self.sessions.write() {
|
||||||
|
sessions.remove(&session_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_rig(&self, session_id: Uuid) -> Option<String> {
|
||||||
|
self.sessions
|
||||||
|
.read()
|
||||||
|
.ok()
|
||||||
|
.and_then(|sessions| sessions.get(&session_id).cloned())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_rig(&self, session_id: Uuid, rig_id: String) {
|
||||||
|
if let Ok(mut sessions) = self.sessions.write() {
|
||||||
|
sessions.insert(session_id, rig_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type SharedSessionRigManager = Arc<SessionRigManager>;
|
||||||
|
|
||||||
#[get("/status")]
|
#[get("/status")]
|
||||||
pub async fn status_api(
|
pub async fn status_api(
|
||||||
state: web::Data<watch::Receiver<RigState>>,
|
state: web::Data<watch::Receiver<RigState>>,
|
||||||
@@ -286,6 +324,7 @@ fn decode_history_retention_min_from_context(context: &FrontendRuntimeContext) -
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[get("/events")]
|
#[get("/events")]
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub async fn events(
|
pub async fn events(
|
||||||
state: web::Data<watch::Receiver<RigState>>,
|
state: web::Data<watch::Receiver<RigState>>,
|
||||||
clients: web::Data<Arc<AtomicUsize>>,
|
clients: web::Data<Arc<AtomicUsize>>,
|
||||||
@@ -294,6 +333,7 @@ pub async fn events(
|
|||||||
bookmark_store: web::Data<Arc<crate::server::bookmarks::BookmarkStore>>,
|
bookmark_store: web::Data<Arc<crate::server::bookmarks::BookmarkStore>>,
|
||||||
scheduler_status: web::Data<crate::server::scheduler::SchedulerStatusMap>,
|
scheduler_status: web::Data<crate::server::scheduler::SchedulerStatusMap>,
|
||||||
scheduler_control: web::Data<crate::server::scheduler::SharedSchedulerControlManager>,
|
scheduler_control: web::Data<crate::server::scheduler::SharedSchedulerControlManager>,
|
||||||
|
session_rig_mgr: web::Data<Arc<SessionRigManager>>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let rx = state.get_ref().clone();
|
let rx = state.get_ref().clone();
|
||||||
let initial = wait_for_view(rx.clone()).await?;
|
let initial = wait_for_view(rx.clone()).await?;
|
||||||
@@ -313,6 +353,7 @@ pub async fn events(
|
|||||||
.ok()
|
.ok()
|
||||||
.and_then(|g| g.clone());
|
.and_then(|g| g.clone());
|
||||||
if let Some(ref rid) = active_rig_id {
|
if let Some(ref rid) = active_rig_id {
|
||||||
|
session_rig_mgr.register(session_id, rid.clone());
|
||||||
vchan_mgr.init_rig(
|
vchan_mgr.init_rig(
|
||||||
rid,
|
rid,
|
||||||
initial.status.freq.hz,
|
initial.status.freq.hz,
|
||||||
@@ -357,6 +398,7 @@ pub async fn events(
|
|||||||
let bookmark_store_updates = bookmark_store.get_ref().clone();
|
let bookmark_store_updates = bookmark_store.get_ref().clone();
|
||||||
let scheduler_status_updates = scheduler_status.get_ref().clone();
|
let scheduler_status_updates = scheduler_status.get_ref().clone();
|
||||||
let scheduler_control_updates = scheduler_control.get_ref().clone();
|
let scheduler_control_updates = scheduler_control.get_ref().clone();
|
||||||
|
let session_rig_mgr_updates = session_rig_mgr.get_ref().clone();
|
||||||
let updates = WatchStream::new(rx).filter_map(move |state| {
|
let updates = WatchStream::new(rx).filter_map(move |state| {
|
||||||
let counter = counter_updates.clone();
|
let counter = counter_updates.clone();
|
||||||
let context = context_updates.clone();
|
let context = context_updates.clone();
|
||||||
@@ -364,9 +406,17 @@ pub async fn events(
|
|||||||
let bookmark_store = bookmark_store_updates.clone();
|
let bookmark_store = bookmark_store_updates.clone();
|
||||||
let scheduler_status = scheduler_status_updates.clone();
|
let scheduler_status = scheduler_status_updates.clone();
|
||||||
let scheduler_control = scheduler_control_updates.clone();
|
let scheduler_control = scheduler_control_updates.clone();
|
||||||
|
let session_rig_mgr = session_rig_mgr_updates.clone();
|
||||||
async move {
|
async move {
|
||||||
state.snapshot().and_then(|v| {
|
state.snapshot().and_then(|v| {
|
||||||
if let Ok(Some(rig_id)) = context.remote_active_rig_id.lock().map(|g| g.clone()) {
|
let rig_id_opt = session_rig_mgr.get_rig(session_id).or_else(|| {
|
||||||
|
context
|
||||||
|
.remote_active_rig_id
|
||||||
|
.lock()
|
||||||
|
.ok()
|
||||||
|
.and_then(|g| g.clone())
|
||||||
|
});
|
||||||
|
if let Some(rig_id) = rig_id_opt {
|
||||||
vchan.update_primary(
|
vchan.update_primary(
|
||||||
&rig_id,
|
&rig_id,
|
||||||
v.status.freq.hz,
|
v.status.freq.hz,
|
||||||
@@ -426,12 +476,14 @@ pub async fn events(
|
|||||||
let vchan_drop = vchan_mgr.get_ref().clone();
|
let vchan_drop = vchan_mgr.get_ref().clone();
|
||||||
let counter_drop = counter.clone();
|
let counter_drop = counter.clone();
|
||||||
let scheduler_control_drop = scheduler_control.get_ref().clone();
|
let scheduler_control_drop = scheduler_control.get_ref().clone();
|
||||||
|
let session_rig_mgr_drop = session_rig_mgr.get_ref().clone();
|
||||||
let live = select(select(pings, updates), chan_updates);
|
let live = select(select(pings, updates), chan_updates);
|
||||||
let stream = prefix_stream.chain(live);
|
let stream = prefix_stream.chain(live);
|
||||||
let stream = DropStream::new(Box::pin(stream), move || {
|
let stream = DropStream::new(Box::pin(stream), move || {
|
||||||
counter_drop.fetch_sub(1, Ordering::Relaxed);
|
counter_drop.fetch_sub(1, Ordering::Relaxed);
|
||||||
vchan_drop.release_session(session_id);
|
vchan_drop.release_session(session_id);
|
||||||
scheduler_control_drop.unregister_session(session_id);
|
scheduler_control_drop.unregister_session(session_id);
|
||||||
|
session_rig_mgr_drop.unregister(session_id);
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(HttpResponse::Ok()
|
Ok(HttpResponse::Ok()
|
||||||
@@ -1520,6 +1572,7 @@ pub async fn list_rigs(
|
|||||||
#[derive(serde::Deserialize)]
|
#[derive(serde::Deserialize)]
|
||||||
pub struct SelectRigQuery {
|
pub struct SelectRigQuery {
|
||||||
pub rig_id: String,
|
pub rig_id: String,
|
||||||
|
pub session_id: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[post("/select_rig")]
|
#[post("/select_rig")]
|
||||||
@@ -1527,6 +1580,7 @@ pub async fn select_rig(
|
|||||||
query: web::Query<SelectRigQuery>,
|
query: web::Query<SelectRigQuery>,
|
||||||
context: web::Data<Arc<FrontendRuntimeContext>>,
|
context: web::Data<Arc<FrontendRuntimeContext>>,
|
||||||
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
|
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
|
||||||
|
session_rig_mgr: web::Data<Arc<SessionRigManager>>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let rig_id = query.rig_id.trim();
|
let rig_id = query.rig_id.trim();
|
||||||
if rig_id.is_empty() {
|
if rig_id.is_empty() {
|
||||||
@@ -1551,6 +1605,13 @@ pub async fn select_rig(
|
|||||||
*active = Some(rig_id.to_string());
|
*active = Some(rig_id.to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update per-session rig selection if session_id is provided.
|
||||||
|
if let Some(ref sid) = query.session_id {
|
||||||
|
if let Ok(uuid) = Uuid::parse_str(sid) {
|
||||||
|
session_rig_mgr.set_rig(uuid, rig_id.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Broadcast the channel list for the newly selected rig so all SSE
|
// Broadcast the channel list for the newly selected rig so all SSE
|
||||||
// clients receive the correct virtual channels immediately.
|
// clients receive the correct virtual channels immediately.
|
||||||
let chans = vchan_mgr.channels(rig_id);
|
let chans = vchan_mgr.channels(rig_id);
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
// SPDX-License-Identifier: BSD-2-Clause
|
// SPDX-License-Identifier: BSD-2-Clause
|
||||||
|
|
||||||
#[path = "api.rs"]
|
#[path = "api.rs"]
|
||||||
mod api;
|
pub mod api;
|
||||||
#[path = "audio.rs"]
|
#[path = "audio.rs"]
|
||||||
pub mod audio;
|
pub mod audio;
|
||||||
#[path = "auth.rs"]
|
#[path = "auth.rs"]
|
||||||
@@ -90,6 +90,7 @@ async fn serve(
|
|||||||
let background_decode_path = BackgroundDecodeStore::default_path();
|
let background_decode_path = BackgroundDecodeStore::default_path();
|
||||||
let background_decode_store = Arc::new(BackgroundDecodeStore::open(&background_decode_path));
|
let background_decode_store = Arc::new(BackgroundDecodeStore::open(&background_decode_path));
|
||||||
let vchan_mgr = Arc::new(ClientChannelManager::new(4));
|
let vchan_mgr = Arc::new(ClientChannelManager::new(4));
|
||||||
|
let session_rig_mgr = Arc::new(api::SessionRigManager::default());
|
||||||
let background_decode_mgr = BackgroundDecodeManager::new(
|
let background_decode_mgr = BackgroundDecodeManager::new(
|
||||||
background_decode_store,
|
background_decode_store,
|
||||||
bookmark_store.clone(),
|
bookmark_store.clone(),
|
||||||
@@ -137,6 +138,7 @@ async fn serve(
|
|||||||
scheduler_status,
|
scheduler_status,
|
||||||
scheduler_control,
|
scheduler_control,
|
||||||
vchan_mgr,
|
vchan_mgr,
|
||||||
|
session_rig_mgr,
|
||||||
background_decode_mgr,
|
background_decode_mgr,
|
||||||
)?;
|
)?;
|
||||||
let handle = server.handle();
|
let handle = server.handle();
|
||||||
@@ -162,6 +164,7 @@ fn build_server(
|
|||||||
scheduler_status: SchedulerStatusMap,
|
scheduler_status: SchedulerStatusMap,
|
||||||
scheduler_control: Arc<SchedulerControlManager>,
|
scheduler_control: Arc<SchedulerControlManager>,
|
||||||
vchan_mgr: Arc<ClientChannelManager>,
|
vchan_mgr: Arc<ClientChannelManager>,
|
||||||
|
session_rig_mgr: Arc<api::SessionRigManager>,
|
||||||
background_decode_mgr: Arc<BackgroundDecodeManager>,
|
background_decode_mgr: Arc<BackgroundDecodeManager>,
|
||||||
) -> Result<Server, actix_web::Error> {
|
) -> Result<Server, actix_web::Error> {
|
||||||
let state_data = web::Data::new(state_rx);
|
let state_data = web::Data::new(state_rx);
|
||||||
@@ -176,6 +179,7 @@ fn build_server(
|
|||||||
let scheduler_status = web::Data::new(scheduler_status);
|
let scheduler_status = web::Data::new(scheduler_status);
|
||||||
let scheduler_control = web::Data::new(scheduler_control);
|
let scheduler_control = web::Data::new(scheduler_control);
|
||||||
let vchan_mgr = web::Data::new(vchan_mgr);
|
let vchan_mgr = web::Data::new(vchan_mgr);
|
||||||
|
let session_rig_mgr = web::Data::new(session_rig_mgr);
|
||||||
let background_decode_mgr = web::Data::new(background_decode_mgr);
|
let background_decode_mgr = web::Data::new(background_decode_mgr);
|
||||||
|
|
||||||
// Extract auth config values before moving context
|
// Extract auth config values before moving context
|
||||||
@@ -221,6 +225,7 @@ fn build_server(
|
|||||||
.app_data(scheduler_status.clone())
|
.app_data(scheduler_status.clone())
|
||||||
.app_data(scheduler_control.clone())
|
.app_data(scheduler_control.clone())
|
||||||
.app_data(vchan_mgr.clone())
|
.app_data(vchan_mgr.clone())
|
||||||
|
.app_data(session_rig_mgr.clone())
|
||||||
.app_data(background_decode_mgr.clone())
|
.app_data(background_decode_mgr.clone())
|
||||||
.wrap(Compress::default())
|
.wrap(Compress::default())
|
||||||
.wrap(
|
.wrap(
|
||||||
|
|||||||
Reference in New Issue
Block a user