diff --git a/Cargo.lock b/Cargo.lock index 9ec450c..b892812 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2598,6 +2598,7 @@ dependencies = [ "trx-core", "trx-frontend", "trx-protocol", + "uuid", ] [[package]] diff --git a/src/trx-client/trx-frontend/trx-frontend-http/Cargo.toml b/src/trx-client/trx-frontend/trx-frontend-http/Cargo.toml index dc58bbc..22dca54 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/Cargo.toml +++ b/src/trx-client/trx-frontend/trx-frontend-http/Cargo.toml @@ -25,3 +25,4 @@ rand = "0.8" hex = "0.4" pickledb = "0.5" dirs = "6" +uuid = { workspace = true } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index c824683..d0c59d7 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -303,6 +303,7 @@ function applyCapabilities(caps) { sdrSquelchSupported = false; } updateSdrSquelchControlVisibility(); + if (typeof vchanApplyCapabilities === "function") vchanApplyCapabilities(caps); } const freqEl = document.getElementById("freq"); @@ -2738,6 +2739,12 @@ function connect() { es.addEventListener("ping", () => { lastEventAt = Date.now(); }); + es.addEventListener("session", evt => { + if (typeof vchanHandleSession === "function") vchanHandleSession(evt.data); + }); + es.addEventListener("channels", evt => { + if (typeof vchanHandleChannels === "function") vchanHandleChannels(evt.data); + }); es.onerror = () => { // Check if this is an auth error by looking at readyState if (es.readyState === EventSource.CLOSED) { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html index b0366fc..719a5f9 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html @@ -259,6 +259,10 @@
VFO
+
Signal
@@ -800,6 +804,7 @@ + diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/vchan.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/vchan.js new file mode 100644 index 0000000..3c1cd06 --- /dev/null +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/vchan.js @@ -0,0 +1,170 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +// --- Virtual Channels Plugin --- +// +// Handles the `session` and `channels` SSE events emitted by /events and +// provides the channel picker UI (SDR-only, shown when filter_controls is set). + +let vchanSessionId = null; +let vchanRigId = null; +let vchanChannels = []; +let vchanActiveId = null; + +function vchanFmtFreq(hz) { + if (!Number.isFinite(hz) || hz <= 0) return "--"; + if (hz >= 1e9) return (hz / 1e9).toFixed(4).replace(/\.?0+$/, "") + "\u202fGHz"; + if (hz >= 1e6) return (hz / 1e6).toFixed(4).replace(/\.?0+$/, "") + "\u202fMHz"; + if (hz >= 1e3) return (hz / 1e3).toFixed(1).replace(/\.?0+$/, "") + "\u202fkHz"; + return hz + "\u202fHz"; +} + +// Called by app.js when the SSE `session` event arrives. +function vchanHandleSession(data) { + try { + const d = JSON.parse(data); + vchanSessionId = d.session_id || null; + } catch (e) { + console.warn("vchan: bad session event", e); + } +} + +// Called by app.js when the SSE `channels` event arrives. +function vchanHandleChannels(data) { + try { + const d = JSON.parse(data); + vchanRigId = d.rig_id || null; + vchanChannels = d.channels || []; + // If the active channel was evicted, fall back to channel 0. + const ids = new Set(vchanChannels.map(c => c.id)); + if (vchanActiveId && !ids.has(vchanActiveId)) { + vchanActiveId = vchanChannels.length > 0 ? vchanChannels[0].id : null; + } + vchanRender(); + } catch (e) { + console.warn("vchan: bad channels event", e); + } +} + +function vchanRender() { + const picker = document.getElementById("vchan-picker"); + if (!picker) return; + picker.innerHTML = ""; + + vchanChannels.forEach(ch => { + const btn = document.createElement("button"); + btn.type = "button"; + btn.title = `Ch ${ch.index}: ${vchanFmtFreq(ch.freq_hz)} ${ch.mode} · ${ch.subscribers} subscriber${ch.subscribers !== 1 ? "s" : ""}`; + if (ch.id === vchanActiveId) btn.classList.add("active"); + + const label = document.createElement("span"); + label.className = "vchan-label"; + label.textContent = `${ch.index}: ${vchanFmtFreq(ch.freq_hz)} ${ch.mode}`; + btn.appendChild(label); + + if (!ch.permanent) { + const del = document.createElement("span"); + del.className = "vchan-del"; + del.textContent = "\u00d7"; + del.title = "Delete channel"; + del.addEventListener("click", e => { + e.stopPropagation(); + vchanDelete(ch.id); + }); + btn.appendChild(del); + } + + btn.addEventListener("click", () => { + if (ch.id !== vchanActiveId) vchanSubscribe(ch.id); + }); + + picker.appendChild(btn); + }); + + // "+" button — allocate a new channel at the current VFO frequency. + const addBtn = document.createElement("button"); + addBtn.type = "button"; + addBtn.className = "vchan-add"; + addBtn.textContent = "+"; + addBtn.title = "Allocate new virtual channel at current frequency"; + addBtn.addEventListener("click", vchanAllocate); + picker.appendChild(addBtn); +} + +async function vchanAllocate() { + if (!vchanSessionId || !vchanRigId) return; + + // Use the last known rig frequency and mode as the starting point. + const freqHz = (typeof lastFreqHz === "number" && lastFreqHz > 0) + ? lastFreqHz + : 0; + const modeEl = document.getElementById("mode"); + const mode = modeEl ? (modeEl.value || "USB") : "USB"; + + try { + const resp = await fetch(`/channels/${encodeURIComponent(vchanRigId)}`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ session_id: vchanSessionId, freq_hz: freqHz, mode }), + }); + if (!resp.ok) { + const msg = await resp.text().catch(() => String(resp.status)); + console.warn("vchan: allocate failed —", msg); + return; + } + const ch = await resp.json(); + vchanActiveId = ch.id; + // The SSE `channels` event will trigger vchanRender(); optimistically + // mark active so the picker feels responsive even before the event arrives. + vchanRender(); + } catch (e) { + console.error("vchan: allocate error", e); + } +} + +async function vchanDelete(channelId) { + if (!vchanRigId) return; + try { + const resp = await fetch( + `/channels/${encodeURIComponent(vchanRigId)}/${encodeURIComponent(channelId)}`, + { method: "DELETE" } + ); + if (!resp.ok) { + console.warn("vchan: delete failed", resp.status); + } + // Channel list updates via SSE `channels` event. + } catch (e) { + console.error("vchan: delete error", e); + } +} + +async function vchanSubscribe(channelId) { + if (!vchanSessionId || !vchanRigId) return; + try { + const resp = await fetch( + `/channels/${encodeURIComponent(vchanRigId)}/${encodeURIComponent(channelId)}/subscribe`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ session_id: vchanSessionId }), + } + ); + if (!resp.ok) { + console.warn("vchan: subscribe failed", resp.status); + return; + } + vchanActiveId = channelId; + vchanRender(); + } catch (e) { + console.error("vchan: subscribe error", e); + } +} + +// Called by app.js from applyCapabilities(). +// Shows the channel picker only for SDR rigs. +function vchanApplyCapabilities(caps) { + const row = document.getElementById("vchan-row"); + if (!row) return; + row.style.display = (caps && caps.filter_controls) ? "" : "none"; +} diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css index f3cfc3d..4451fec 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css @@ -365,6 +365,43 @@ input.status-input, select.status-input { width: 100%; padding: 0.45rem 0.5rem; background: var(--btn-bg); font-weight: 600; } +.vchan-picker { + display: flex; + flex-wrap: wrap; + gap: 4px; +} +.vchan-picker button { + display: inline-flex; + align-items: center; + gap: 4px; + border: 1px solid var(--border-light); + border-radius: 6px; + height: var(--control-height); + padding: 0 0.55rem; + background: var(--input-bg); + color: var(--text-muted); + font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; + font-size: 0.82rem; + cursor: pointer; +} +.vchan-picker button.active { + background: var(--btn-bg); + color: var(--text); + font-weight: 600; + border-color: var(--btn-border, var(--border-light)); +} +.vchan-del { + opacity: 0.5; + font-size: 1rem; + line-height: 1; +} +.vchan-del:hover { opacity: 1; } +.vchan-add { + font-size: 1.1rem; + font-weight: 700; + padding: 0 0.6rem !important; + color: var(--text-muted); +} .signal-measure { display: inline-flex; gap: 0.5rem; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index ce6de2b..7497afc 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -8,10 +8,13 @@ use std::sync::Arc; use actix_web::{delete, get, post, put, web, HttpRequest, HttpResponse, Responder}; use actix_web::{http::header, Error}; use bytes::Bytes; -use futures_util::stream::{once, select, StreamExt}; +use futures_util::stream::{select, StreamExt}; use tokio::sync::{broadcast, mpsc, oneshot, watch}; use tokio::time::{self, Duration}; use tokio_stream::wrappers::{IntervalStream, WatchStream}; +use uuid::Uuid; + +use crate::server::vchan::ClientChannelManager; use trx_core::radio::freq::Freq; use trx_core::rig::state::WfmDenoiseLevel; @@ -219,6 +222,7 @@ pub async fn events( state: web::Data>, clients: web::Data>, context: web::Data>, + vchan_mgr: web::Data>, ) -> Result { let rx = state.get_ref().clone(); let initial = wait_for_view(rx.clone()).await?; @@ -226,22 +230,66 @@ pub async fn events( let counter = clients.get_ref().clone(); let count = counter.fetch_add(1, Ordering::Relaxed) + 1; + // Assign a stable UUID to this SSE session for channel binding. + let session_id = Uuid::new_v4(); + + // Seed the primary channel for the currently-selected rig (no-op if + // already initialised or if no rig is selected yet). + let active_rig_id = context + .remote_active_rig_id + .lock() + .ok() + .and_then(|g| g.clone()); + if let Some(ref rid) = active_rig_id { + vchan_mgr.init_rig( + rid, + initial.status.freq.hz, + &format!("{:?}", initial.status.mode), + ); + } + + // Build the prefix burst: rig state → session UUID → initial channels. let initial_json = serde_json::to_string(&initial).map_err(actix_web::error::ErrorInternalServerError)?; let initial_json = inject_frontend_meta( &initial_json, frontend_meta_from_context(count, context.get_ref().as_ref()), ); - let initial_stream = - once(async move { Ok::(Bytes::from(format!("data: {initial_json}\n\n"))) }); + let mut prefix: Vec> = Vec::new(); + prefix.push(Ok(Bytes::from(format!("data: {initial_json}\n\n")))); + prefix.push(Ok(Bytes::from(format!( + "event: session\ndata: {{\"session_id\":\"{session_id}\"}}\n\n" + )))); + if let Some(ref rid) = active_rig_id { + let chans = vchan_mgr.channels(rid); + if let Ok(json) = serde_json::to_string(&chans) { + prefix.push(Ok(Bytes::from(format!( + "event: channels\ndata: {{\"rig_id\":\"{rid}\",\"channels\":{json}}}\n\n" + )))); + } + } + let prefix_stream = futures_util::stream::iter(prefix); + + // Live rig-state updates; side-effect: keep primary channel metadata in sync. let counter_updates = counter.clone(); let context_updates = context.get_ref().clone(); + let vchan_updates = vchan_mgr.get_ref().clone(); let updates = WatchStream::new(rx).filter_map(move |state| { let counter = counter_updates.clone(); let context = context_updates.clone(); + let vchan = vchan_updates.clone(); async move { state.snapshot().and_then(|v| { + if let Ok(Some(rig_id)) = + context.remote_active_rig_id.lock().map(|g| g.clone()) + { + vchan.update_primary( + &rig_id, + v.status.freq.hz, + &format!("{:?}", v.status.mode), + ); + } serde_json::to_string(&v).ok().map(|json| { let json = inject_frontend_meta( &json, @@ -256,16 +304,43 @@ pub async fn events( } }); - // Send a named "ping" event so the JS heartbeat can observe it (SSE - // comments like ": ping" are not exposed by EventSource.onmessage). + // Channel-list change events from the virtual channel manager. + let vchan_change_rx = vchan_mgr.change_tx.subscribe(); + let chan_updates = futures_util::stream::unfold(vchan_change_rx, |mut rx| async move { + loop { + match rx.recv().await { + Ok(msg) => { + if let Some(colon) = msg.find(':') { + let rig_id = &msg[..colon]; + let channels_json = &msg[colon + 1..]; + let payload = format!( + "{{\"rig_id\":\"{rig_id}\",\"channels\":{channels_json}}}" + ); + return Some(( + Ok::(Bytes::from(format!( + "event: channels\ndata: {payload}\n\n" + ))), + rx, + )); + } + } + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => return None, + } + } + }); + + // Send a named "ping" event so the JS heartbeat can observe it. let pings = IntervalStream::new(time::interval(Duration::from_secs(5))) .map(|_| Ok::(Bytes::from("event: ping\ndata: \n\n"))); - // Wrap stream to decrement counter on drop. + let vchan_drop = vchan_mgr.get_ref().clone(); let counter_drop = counter.clone(); - let stream = initial_stream.chain(select(pings, updates)); + let live = select(select(pings, updates), chan_updates); + let stream = prefix_stream.chain(live); let stream = DropStream::new(Box::pin(stream), move || { counter_drop.fetch_sub(1, Ordering::Relaxed); + vchan_drop.release_session(session_id); }); Ok(HttpResponse::Ok() @@ -1037,6 +1112,117 @@ pub async fn select_rig( Ok(HttpResponse::Ok().json(build_rig_list_payload(context.get_ref().as_ref()))) } +// --------------------------------------------------------------------------- +// Virtual channel CRUD +// --------------------------------------------------------------------------- + +#[get("/channels/{rig_id}")] +pub async fn list_channels( + path: web::Path, + vchan_mgr: web::Data>, +) -> impl Responder { + let rig_id = path.into_inner(); + HttpResponse::Ok().json(vchan_mgr.channels(&rig_id)) +} + +#[derive(serde::Deserialize)] +struct AllocateChannelBody { + session_id: Uuid, + freq_hz: u64, + mode: String, +} + +#[post("/channels/{rig_id}")] +pub async fn allocate_channel( + path: web::Path, + body: web::Json, + vchan_mgr: web::Data>, +) -> impl Responder { + let rig_id = path.into_inner(); + match vchan_mgr.allocate(body.session_id, &rig_id, body.freq_hz, &body.mode) { + Ok(ch) => HttpResponse::Ok().json(ch), + Err(e) => HttpResponse::BadRequest().body(e.to_string()), + } +} + +#[delete("/channels/{rig_id}/{channel_id}")] +pub async fn delete_channel_route( + path: web::Path<(String, Uuid)>, + vchan_mgr: web::Data>, +) -> impl Responder { + let (rig_id, channel_id) = path.into_inner(); + match vchan_mgr.delete_channel(&rig_id, channel_id) { + Ok(()) => HttpResponse::Ok().finish(), + Err(crate::server::vchan::VChanClientError::NotFound) => { + HttpResponse::NotFound().finish() + } + Err(crate::server::vchan::VChanClientError::Permanent) => { + HttpResponse::BadRequest().body("cannot remove the primary channel") + } + Err(e) => HttpResponse::BadRequest().body(e.to_string()), + } +} + +#[derive(serde::Deserialize)] +struct SubscribeBody { + session_id: Uuid, +} + +#[post("/channels/{rig_id}/{channel_id}/subscribe")] +pub async fn subscribe_channel( + path: web::Path<(String, Uuid)>, + body: web::Json, + vchan_mgr: web::Data>, +) -> impl Responder { + let (rig_id, channel_id) = path.into_inner(); + match vchan_mgr.subscribe_session(body.session_id, &rig_id, channel_id) { + Some(ch) => HttpResponse::Ok().json(ch), + None => HttpResponse::NotFound().finish(), + } +} + +#[derive(serde::Deserialize)] +struct SetChanFreqBody { + freq_hz: u64, +} + +#[put("/channels/{rig_id}/{channel_id}/freq")] +pub async fn set_vchan_freq( + path: web::Path<(String, Uuid)>, + body: web::Json, + vchan_mgr: web::Data>, +) -> impl Responder { + let (rig_id, channel_id) = path.into_inner(); + match vchan_mgr.set_channel_freq(&rig_id, channel_id, body.freq_hz) { + Ok(()) => HttpResponse::Ok().finish(), + Err(crate::server::vchan::VChanClientError::NotFound) => { + HttpResponse::NotFound().finish() + } + Err(e) => HttpResponse::BadRequest().body(e.to_string()), + } +} + +#[derive(serde::Deserialize)] +struct SetChanModeBody { + mode: String, +} + +#[put("/channels/{rig_id}/{channel_id}/mode")] +pub async fn set_vchan_mode( + path: web::Path<(String, Uuid)>, + body: web::Json, + vchan_mgr: web::Data>, +) -> impl Responder { + let (rig_id, channel_id) = path.into_inner(); + match vchan_mgr.set_channel_mode(&rig_id, channel_id, &body.mode) { + Ok(()) => HttpResponse::Ok().finish(), + Err(crate::server::vchan::VChanClientError::NotFound) => { + HttpResponse::NotFound().finish() + } + Err(e) => HttpResponse::BadRequest().body(e.to_string()), + } +} + pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(index) .service(status_api) @@ -1104,6 +1290,14 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(cw_js) .service(bookmarks_js) .service(scheduler_js) + .service(vchan_js) + // Virtual channels + .service(list_channels) + .service(allocate_channel) + .service(delete_channel_route) + .service(subscribe_channel) + .service(set_vchan_freq) + .service(set_vchan_mode) // Auth endpoints .service(crate::server::auth::login) .service(crate::server::auth::logout) @@ -1265,6 +1459,16 @@ async fn scheduler_js() -> impl Responder { .body(status::SCHEDULER_JS) } +#[get("/vchan.js")] +async fn vchan_js() -> impl Responder { + HttpResponse::Ok() + .insert_header(( + header::CONTENT_TYPE, + "application/javascript; charset=utf-8", + )) + .body(status::VCHAN_JS) +} + async fn send_command( rig_tx: &mpsc::Sender, cmd: RigCommand, diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs index 0fbc119..bf79a23 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/auth.rs @@ -437,6 +437,7 @@ impl RouteAccess { || path.starts_with("/bookmarks?") || path.starts_with("/bookmarks/") || path.starts_with("/scheduler/") + || path.starts_with("/channels/") { return Self::Read; } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs index 6bcf0f5..822f958 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs @@ -14,6 +14,8 @@ pub mod bookmarks; pub mod scheduler; #[path = "status.rs"] pub mod status; +#[path = "vchan.rs"] +pub mod vchan; use std::collections::HashMap; use std::net::SocketAddr; @@ -36,6 +38,7 @@ use trx_frontend::{FrontendRuntimeContext, FrontendSpawner}; use auth::{AuthConfig, AuthState, SameSite}; use scheduler::{SchedulerStatusMap, SchedulerStore}; +use vchan::ClientChannelManager; /// HTTP frontend implementation. pub struct HttpFrontend; @@ -79,7 +82,8 @@ async fn serve( scheduler_status.clone(), ); - let server = build_server(addr, state_rx, rig_tx, callsign, context, scheduler_store, scheduler_status)?; + let vchan_mgr = Arc::new(ClientChannelManager::new(4)); + let server = build_server(addr, state_rx, rig_tx, callsign, context, scheduler_store, scheduler_status, vchan_mgr)?; let handle = server.handle(); tokio::spawn(async move { let _ = signal::ctrl_c().await; @@ -99,6 +103,7 @@ fn build_server( context: Arc, scheduler_store: Arc, scheduler_status: SchedulerStatusMap, + vchan_mgr: Arc, ) -> Result { let state_data = web::Data::new(state_rx); let rig_tx = web::Data::new(rig_tx); @@ -111,6 +116,7 @@ fn build_server( let scheduler_store = web::Data::new(scheduler_store); let scheduler_status = web::Data::new(scheduler_status); + let vchan_mgr = web::Data::new(vchan_mgr); // Extract auth config values before moving context let same_site = match context.http_auth_cookie_same_site.as_str() { @@ -153,6 +159,7 @@ fn build_server( .app_data(bookmark_store.clone()) .app_data(scheduler_store.clone()) .app_data(scheduler_status.clone()) + .app_data(vchan_mgr.clone()) .wrap(Compress::default()) .wrap( DefaultHeaders::new() diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs index 31fb38c..d2e9e5c 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/status.rs @@ -21,6 +21,7 @@ pub const WSPR_JS: &str = include_str!("../assets/web/plugins/wspr.js"); pub const CW_JS: &str = include_str!("../assets/web/plugins/cw.js"); pub const BOOKMARKS_JS: &str = include_str!("../assets/web/plugins/bookmarks.js"); pub const SCHEDULER_JS: &str = include_str!("../assets/web/plugins/scheduler.js"); +pub const VCHAN_JS: &str = include_str!("../assets/web/plugins/vchan.js"); pub fn index_html() -> String { INDEX_HTML diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs new file mode 100644 index 0000000..a4d08d9 --- /dev/null +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/vchan.rs @@ -0,0 +1,374 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Client-side virtual channel registry. +//! +//! Each rig has a list of virtual channels tracked entirely within the HTTP +//! frontend process. Channel 0 is permanent and mirrors the rig's current +//! dial frequency. Additional channels are allocated by a tab (identified by +//! its SSE session UUID) and freed when that session disconnects or the tab +//! explicitly deletes them. +//! +//! Actual DSP on the server is unaffected by this registry in Phase 1; the +//! registry is the source of truth for metadata (freq/mode per channel) and +//! drives `SetFreq`/`SetMode` commands to the server when a tab selects or +//! tunes a channel. + +use std::collections::HashMap; +use std::sync::RwLock; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; +use uuid::Uuid; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/// HTTP-visible snapshot of one channel. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClientChannel { + pub id: Uuid, + /// Position in the ordered list (0 = primary). + pub index: usize, + pub freq_hz: u64, + pub mode: String, + /// True for channel 0 — cannot be deleted. + pub permanent: bool, + /// Number of SSE sessions currently subscribed to this channel. + pub subscribers: usize, +} + +#[derive(Debug, Clone)] +pub enum VChanClientError { + /// Channel cap would be exceeded. + CapReached { max: usize }, + /// Channel UUID not found. + NotFound, + /// Tried to delete the permanent primary channel. + Permanent, +} + +impl std::fmt::Display for VChanClientError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + VChanClientError::CapReached { max } => { + write!(f, "channel cap reached (max {})", max) + } + VChanClientError::NotFound => write!(f, "channel not found"), + VChanClientError::Permanent => write!(f, "cannot remove the primary channel"), + } + } +} + +// --------------------------------------------------------------------------- +// Internal record +// --------------------------------------------------------------------------- + +struct InternalChannel { + id: Uuid, + freq_hz: u64, + mode: String, + permanent: bool, + /// Session UUIDs currently subscribed to this channel. + session_ids: Vec, +} + +// --------------------------------------------------------------------------- +// ClientChannelManager +// --------------------------------------------------------------------------- + +/// Per-rig channel registry shared across all actix handlers. +pub struct ClientChannelManager { + /// rig_id → ordered channel list. + rigs: RwLock>>, + /// session_id → (rig_id, channel_id). + sessions: RwLock>, + /// Broadcast used to push updated channel lists to SSE streams. + /// Payload: JSON string (serialised `Vec`), prefixed by + /// `":"` so subscribers can filter by rig. + pub change_tx: broadcast::Sender, + pub max_channels: usize, +} + +impl ClientChannelManager { + pub fn new(max_channels: usize) -> Self { + let (change_tx, _) = broadcast::channel(64); + Self { + rigs: RwLock::new(HashMap::new()), + sessions: RwLock::new(HashMap::new()), + change_tx, + max_channels: max_channels.max(1), + } + } + + // -- helpers -------------------------------------------------------- + + fn broadcast_change(&self, rig_id: &str, channels: &[InternalChannel]) { + let list: Vec = channels + .iter() + .enumerate() + .map(|(idx, c)| ClientChannel { + id: c.id, + index: idx, + freq_hz: c.freq_hz, + mode: c.mode.clone(), + permanent: c.permanent, + subscribers: c.session_ids.len(), + }) + .collect(); + if let Ok(json) = serde_json::to_string(&list) { + let _ = self.change_tx.send(format!("{}:{}", rig_id, json)); + } + } + + // -- public API ------------------------------------------------------- + + /// Ensure channel 0 exists for `rig_id`. Call this when the SSE stream + /// first delivers rig state so the primary channel reflects the current freq. + pub fn init_rig(&self, rig_id: &str, freq_hz: u64, mode: &str) { + let mut rigs = self.rigs.write().unwrap(); + let channels = rigs.entry(rig_id.to_string()).or_default(); + if channels.is_empty() { + channels.push(InternalChannel { + id: Uuid::new_v4(), + freq_hz, + mode: mode.to_string(), + permanent: true, + session_ids: Vec::new(), + }); + } + } + + /// Update channel 0's freq/mode when the server pushes a new rig state. + pub fn update_primary(&self, rig_id: &str, freq_hz: u64, mode: &str) { + let mut rigs = self.rigs.write().unwrap(); + if let Some(channels) = rigs.get_mut(rig_id) { + if let Some(ch) = channels.first_mut() { + if ch.freq_hz != freq_hz || ch.mode != mode { + ch.freq_hz = freq_hz; + ch.mode = mode.to_string(); + self.broadcast_change(rig_id, channels); + } + } + } + } + + /// List all channels for a rig (returns empty vec if rig unknown). + pub fn channels(&self, rig_id: &str) -> Vec { + let rigs = self.rigs.read().unwrap(); + rigs.get(rig_id) + .map(|chs| { + chs.iter() + .enumerate() + .map(|(idx, c)| ClientChannel { + id: c.id, + index: idx, + freq_hz: c.freq_hz, + mode: c.mode.clone(), + permanent: c.permanent, + subscribers: c.session_ids.len(), + }) + .collect() + }) + .unwrap_or_default() + } + + /// Allocate a new virtual channel for `session_id`. + /// If `session_id` already owns a channel on this rig, it is released first. + /// Returns the new `ClientChannel` snapshot. + pub fn allocate( + &self, + session_id: Uuid, + rig_id: &str, + freq_hz: u64, + mode: &str, + ) -> Result { + // Release any existing channel owned by this session on this rig. + self.release_session_on_rig(session_id, rig_id); + + let mut rigs = self.rigs.write().unwrap(); + let channels = rigs.entry(rig_id.to_string()).or_default(); + + if channels.len() >= self.max_channels { + return Err(VChanClientError::CapReached { + max: self.max_channels, + }); + } + + let id = Uuid::new_v4(); + let idx = channels.len(); + channels.push(InternalChannel { + id, + freq_hz, + mode: mode.to_string(), + permanent: false, + session_ids: vec![session_id], + }); + + let snapshot = ClientChannel { + id, + index: idx, + freq_hz, + mode: mode.to_string(), + permanent: false, + subscribers: 1, + }; + + self.broadcast_change(rig_id, channels); + + // Update session → channel mapping. + drop(rigs); + self.sessions + .write() + .unwrap() + .insert(session_id, (rig_id.to_string(), id)); + + Ok(snapshot) + } + + /// Subscribe an SSE session to a channel (by channel UUID). + /// Idempotent. Returns `None` if channel not found. + pub fn subscribe_session( + &self, + session_id: Uuid, + rig_id: &str, + channel_id: Uuid, + ) -> Option { + // Release previous subscription on this rig. + self.release_session_on_rig(session_id, rig_id); + + let mut rigs = self.rigs.write().unwrap(); + let channels = rigs.get_mut(rig_id)?; + let (idx, ch) = channels + .iter_mut() + .enumerate() + .find(|(_, c)| c.id == channel_id)?; + + if !ch.session_ids.contains(&session_id) { + ch.session_ids.push(session_id); + } + let snapshot = ClientChannel { + id: ch.id, + index: idx, + freq_hz: ch.freq_hz, + mode: ch.mode.clone(), + permanent: ch.permanent, + subscribers: ch.session_ids.len(), + }; + + self.broadcast_change(rig_id, channels); + + drop(rigs); + self.sessions + .write() + .unwrap() + .insert(session_id, (rig_id.to_string(), channel_id)); + + Some(snapshot) + } + + /// Release all channel subscriptions for `session_id` across all rigs. + /// Auto-removes non-permanent channels that reach 0 subscribers. + pub fn release_session(&self, session_id: Uuid) { + let mapping = { + let mut sessions = self.sessions.write().unwrap(); + sessions.remove(&session_id) + }; + if let Some((rig_id, _)) = mapping { + self.release_session_on_rig(session_id, &rig_id); + } + } + + fn release_session_on_rig(&self, session_id: Uuid, rig_id: &str) { + let mut rigs = self.rigs.write().unwrap(); + let Some(channels) = rigs.get_mut(rig_id) else { + return; + }; + let mut changed = false; + for ch in channels.iter_mut() { + if let Some(pos) = ch.session_ids.iter().position(|&s| s == session_id) { + ch.session_ids.remove(pos); + changed = true; + } + } + // Remove non-permanent channels with no subscribers. + let before = channels.len(); + channels.retain(|c| c.permanent || !c.session_ids.is_empty()); + if channels.len() != before { + changed = true; + } + if changed { + self.broadcast_change(rig_id, channels); + } + } + + /// Explicitly delete a channel by UUID (any session may do this). + pub fn delete_channel( + &self, + rig_id: &str, + channel_id: Uuid, + ) -> Result<(), VChanClientError> { + let mut rigs = self.rigs.write().unwrap(); + let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?; + let pos = channels + .iter() + .position(|c| c.id == channel_id) + .ok_or(VChanClientError::NotFound)?; + if channels[pos].permanent { + return Err(VChanClientError::Permanent); + } + // Collect evicted sessions to clean up the session map. + let evicted: Vec = channels[pos].session_ids.clone(); + channels.remove(pos); + self.broadcast_change(rig_id, channels); + drop(rigs); + + let mut sessions = self.sessions.write().unwrap(); + for sid in evicted { + sessions.remove(&sid); + } + Ok(()) + } + + /// Update freq/mode metadata for a channel. + pub fn set_channel_freq( + &self, + rig_id: &str, + channel_id: Uuid, + freq_hz: u64, + ) -> Result<(), VChanClientError> { + let mut rigs = self.rigs.write().unwrap(); + let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?; + let ch = channels + .iter_mut() + .find(|c| c.id == channel_id) + .ok_or(VChanClientError::NotFound)?; + ch.freq_hz = freq_hz; + self.broadcast_change(rig_id, channels); + Ok(()) + } + + pub fn set_channel_mode( + &self, + rig_id: &str, + channel_id: Uuid, + mode: &str, + ) -> Result<(), VChanClientError> { + let mut rigs = self.rigs.write().unwrap(); + let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?; + let ch = channels + .iter_mut() + .find(|c| c.id == channel_id) + .ok_or(VChanClientError::NotFound)?; + ch.mode = mode.to_string(); + self.broadcast_change(rig_id, channels); + Ok(()) + } + + /// Return the channel a session is currently subscribed to. + pub fn session_channel(&self, session_id: Uuid) -> Option<(String, Uuid)> { + self.sessions.read().unwrap().get(&session_id).cloned() + } +}