[feat](trx-frontend-http): virtual channel manager and picker UI

Add client-side virtual channel support (Phase 1 — metadata only):

- vchan.rs: ClientChannelManager keyed by rig_id; tracks per-session
  channel subscriptions and broadcasts list changes via change_tx
- server.rs: instantiate Arc<ClientChannelManager> and expose as app_data
- api.rs: wire ClientChannelManager into /events SSE (session UUID,
  init_rig, update_primary, channel change stream, session cleanup on
  disconnect); add channel CRUD routes:
    GET/POST /channels/{rig_id}
    DELETE   /channels/{rig_id}/{channel_id}
    POST     /channels/{rig_id}/{channel_id}/subscribe
    PUT      /channels/{rig_id}/{channel_id}/freq|mode
- auth.rs: classify /channels/ prefix as Read access
- plugins/vchan.js: channel picker with +/× buttons, subscribe on click,
  SDR-only (shown when filter_controls capability is set)
- app.js: handle SSE `session` and `channels` events, call
  vchanApplyCapabilities from applyCapabilities
- index.html: #vchan-row div + <script src="/vchan.js">
- style.css: .vchan-picker pill styles

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-11 07:00:22 +01:00
parent dda5ec17bb
commit e5aa74a1b6
11 changed files with 816 additions and 8 deletions
Generated
+1
View File
@@ -2598,6 +2598,7 @@ dependencies = [
"trx-core",
"trx-frontend",
"trx-protocol",
"uuid",
]
[[package]]
@@ -25,3 +25,4 @@ rand = "0.8"
hex = "0.4"
pickledb = "0.5"
dirs = "6"
uuid = { workspace = true }
@@ -303,6 +303,7 @@ function applyCapabilities(caps) {
sdrSquelchSupported = false;
}
updateSdrSquelchControlVisibility();
if (typeof vchanApplyCapabilities === "function") vchanApplyCapabilities(caps);
}
const freqEl = document.getElementById("freq");
@@ -2738,6 +2739,12 @@ function connect() {
es.addEventListener("ping", () => {
lastEventAt = Date.now();
});
es.addEventListener("session", evt => {
if (typeof vchanHandleSession === "function") vchanHandleSession(evt.data);
});
es.addEventListener("channels", evt => {
if (typeof vchanHandleChannels === "function") vchanHandleChannels(evt.data);
});
es.onerror = () => {
// Check if this is an auth error by looking at readyState
if (es.readyState === EventSource.CLOSED) {
@@ -259,6 +259,10 @@
<div class="label"><span>VFO</span></div>
<div class="vfo-picker" id="vfo-picker"></div>
</div>
<div class="full-row label-below-row" id="vchan-row" style="display:none;">
<div class="label"><span>Channels</span></div>
<div class="vchan-picker" id="vchan-picker"></div>
</div>
<div class="full-row label-below-row">
<div class="label"><span>Signal</span></div>
<div class="signal" style="gap: 1rem;">
@@ -800,6 +804,7 @@
<script src="/cw.js"></script>
<script src="/bookmarks.js"></script>
<script src="/scheduler.js"></script>
<script src="/vchan.js"></script>
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"></script>
<script src="/leaflet-ais-tracksymbol.js"></script>
</body>
@@ -0,0 +1,170 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
// --- Virtual Channels Plugin ---
//
// Handles the `session` and `channels` SSE events emitted by /events and
// provides the channel picker UI (SDR-only, shown when filter_controls is set).
let vchanSessionId = null;
let vchanRigId = null;
let vchanChannels = [];
let vchanActiveId = null;
function vchanFmtFreq(hz) {
if (!Number.isFinite(hz) || hz <= 0) return "--";
if (hz >= 1e9) return (hz / 1e9).toFixed(4).replace(/\.?0+$/, "") + "\u202fGHz";
if (hz >= 1e6) return (hz / 1e6).toFixed(4).replace(/\.?0+$/, "") + "\u202fMHz";
if (hz >= 1e3) return (hz / 1e3).toFixed(1).replace(/\.?0+$/, "") + "\u202fkHz";
return hz + "\u202fHz";
}
// Called by app.js when the SSE `session` event arrives.
function vchanHandleSession(data) {
try {
const d = JSON.parse(data);
vchanSessionId = d.session_id || null;
} catch (e) {
console.warn("vchan: bad session event", e);
}
}
// Called by app.js when the SSE `channels` event arrives.
function vchanHandleChannels(data) {
try {
const d = JSON.parse(data);
vchanRigId = d.rig_id || null;
vchanChannels = d.channels || [];
// If the active channel was evicted, fall back to channel 0.
const ids = new Set(vchanChannels.map(c => c.id));
if (vchanActiveId && !ids.has(vchanActiveId)) {
vchanActiveId = vchanChannels.length > 0 ? vchanChannels[0].id : null;
}
vchanRender();
} catch (e) {
console.warn("vchan: bad channels event", e);
}
}
function vchanRender() {
const picker = document.getElementById("vchan-picker");
if (!picker) return;
picker.innerHTML = "";
vchanChannels.forEach(ch => {
const btn = document.createElement("button");
btn.type = "button";
btn.title = `Ch ${ch.index}: ${vchanFmtFreq(ch.freq_hz)} ${ch.mode} · ${ch.subscribers} subscriber${ch.subscribers !== 1 ? "s" : ""}`;
if (ch.id === vchanActiveId) btn.classList.add("active");
const label = document.createElement("span");
label.className = "vchan-label";
label.textContent = `${ch.index}: ${vchanFmtFreq(ch.freq_hz)} ${ch.mode}`;
btn.appendChild(label);
if (!ch.permanent) {
const del = document.createElement("span");
del.className = "vchan-del";
del.textContent = "\u00d7";
del.title = "Delete channel";
del.addEventListener("click", e => {
e.stopPropagation();
vchanDelete(ch.id);
});
btn.appendChild(del);
}
btn.addEventListener("click", () => {
if (ch.id !== vchanActiveId) vchanSubscribe(ch.id);
});
picker.appendChild(btn);
});
// "+" button — allocate a new channel at the current VFO frequency.
const addBtn = document.createElement("button");
addBtn.type = "button";
addBtn.className = "vchan-add";
addBtn.textContent = "+";
addBtn.title = "Allocate new virtual channel at current frequency";
addBtn.addEventListener("click", vchanAllocate);
picker.appendChild(addBtn);
}
async function vchanAllocate() {
if (!vchanSessionId || !vchanRigId) return;
// Use the last known rig frequency and mode as the starting point.
const freqHz = (typeof lastFreqHz === "number" && lastFreqHz > 0)
? lastFreqHz
: 0;
const modeEl = document.getElementById("mode");
const mode = modeEl ? (modeEl.value || "USB") : "USB";
try {
const resp = await fetch(`/channels/${encodeURIComponent(vchanRigId)}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ session_id: vchanSessionId, freq_hz: freqHz, mode }),
});
if (!resp.ok) {
const msg = await resp.text().catch(() => String(resp.status));
console.warn("vchan: allocate failed —", msg);
return;
}
const ch = await resp.json();
vchanActiveId = ch.id;
// The SSE `channels` event will trigger vchanRender(); optimistically
// mark active so the picker feels responsive even before the event arrives.
vchanRender();
} catch (e) {
console.error("vchan: allocate error", e);
}
}
async function vchanDelete(channelId) {
if (!vchanRigId) return;
try {
const resp = await fetch(
`/channels/${encodeURIComponent(vchanRigId)}/${encodeURIComponent(channelId)}`,
{ method: "DELETE" }
);
if (!resp.ok) {
console.warn("vchan: delete failed", resp.status);
}
// Channel list updates via SSE `channels` event.
} catch (e) {
console.error("vchan: delete error", e);
}
}
async function vchanSubscribe(channelId) {
if (!vchanSessionId || !vchanRigId) return;
try {
const resp = await fetch(
`/channels/${encodeURIComponent(vchanRigId)}/${encodeURIComponent(channelId)}/subscribe`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ session_id: vchanSessionId }),
}
);
if (!resp.ok) {
console.warn("vchan: subscribe failed", resp.status);
return;
}
vchanActiveId = channelId;
vchanRender();
} catch (e) {
console.error("vchan: subscribe error", e);
}
}
// Called by app.js from applyCapabilities().
// Shows the channel picker only for SDR rigs.
function vchanApplyCapabilities(caps) {
const row = document.getElementById("vchan-row");
if (!row) return;
row.style.display = (caps && caps.filter_controls) ? "" : "none";
}
@@ -365,6 +365,43 @@ input.status-input, select.status-input { width: 100%; padding: 0.45rem 0.5rem;
background: var(--btn-bg);
font-weight: 600;
}
.vchan-picker {
display: flex;
flex-wrap: wrap;
gap: 4px;
}
.vchan-picker button {
display: inline-flex;
align-items: center;
gap: 4px;
border: 1px solid var(--border-light);
border-radius: 6px;
height: var(--control-height);
padding: 0 0.55rem;
background: var(--input-bg);
color: var(--text-muted);
font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace;
font-size: 0.82rem;
cursor: pointer;
}
.vchan-picker button.active {
background: var(--btn-bg);
color: var(--text);
font-weight: 600;
border-color: var(--btn-border, var(--border-light));
}
.vchan-del {
opacity: 0.5;
font-size: 1rem;
line-height: 1;
}
.vchan-del:hover { opacity: 1; }
.vchan-add {
font-size: 1.1rem;
font-weight: 700;
padding: 0 0.6rem !important;
color: var(--text-muted);
}
.signal-measure {
display: inline-flex;
gap: 0.5rem;
@@ -8,10 +8,13 @@ use std::sync::Arc;
use actix_web::{delete, get, post, put, web, HttpRequest, HttpResponse, Responder};
use actix_web::{http::header, Error};
use bytes::Bytes;
use futures_util::stream::{once, select, StreamExt};
use futures_util::stream::{select, StreamExt};
use tokio::sync::{broadcast, mpsc, oneshot, watch};
use tokio::time::{self, Duration};
use tokio_stream::wrappers::{IntervalStream, WatchStream};
use uuid::Uuid;
use crate::server::vchan::ClientChannelManager;
use trx_core::radio::freq::Freq;
use trx_core::rig::state::WfmDenoiseLevel;
@@ -219,6 +222,7 @@ pub async fn events(
state: web::Data<watch::Receiver<RigState>>,
clients: web::Data<Arc<AtomicUsize>>,
context: web::Data<Arc<FrontendRuntimeContext>>,
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
) -> Result<HttpResponse, Error> {
let rx = state.get_ref().clone();
let initial = wait_for_view(rx.clone()).await?;
@@ -226,22 +230,66 @@ pub async fn events(
let counter = clients.get_ref().clone();
let count = counter.fetch_add(1, Ordering::Relaxed) + 1;
// Assign a stable UUID to this SSE session for channel binding.
let session_id = Uuid::new_v4();
// Seed the primary channel for the currently-selected rig (no-op if
// already initialised or if no rig is selected yet).
let active_rig_id = context
.remote_active_rig_id
.lock()
.ok()
.and_then(|g| g.clone());
if let Some(ref rid) = active_rig_id {
vchan_mgr.init_rig(
rid,
initial.status.freq.hz,
&format!("{:?}", initial.status.mode),
);
}
// Build the prefix burst: rig state → session UUID → initial channels.
let initial_json =
serde_json::to_string(&initial).map_err(actix_web::error::ErrorInternalServerError)?;
let initial_json = inject_frontend_meta(
&initial_json,
frontend_meta_from_context(count, context.get_ref().as_ref()),
);
let initial_stream =
once(async move { Ok::<Bytes, Error>(Bytes::from(format!("data: {initial_json}\n\n"))) });
let mut prefix: Vec<Result<Bytes, Error>> = Vec::new();
prefix.push(Ok(Bytes::from(format!("data: {initial_json}\n\n"))));
prefix.push(Ok(Bytes::from(format!(
"event: session\ndata: {{\"session_id\":\"{session_id}\"}}\n\n"
))));
if let Some(ref rid) = active_rig_id {
let chans = vchan_mgr.channels(rid);
if let Ok(json) = serde_json::to_string(&chans) {
prefix.push(Ok(Bytes::from(format!(
"event: channels\ndata: {{\"rig_id\":\"{rid}\",\"channels\":{json}}}\n\n"
))));
}
}
let prefix_stream = futures_util::stream::iter(prefix);
// Live rig-state updates; side-effect: keep primary channel metadata in sync.
let counter_updates = counter.clone();
let context_updates = context.get_ref().clone();
let vchan_updates = vchan_mgr.get_ref().clone();
let updates = WatchStream::new(rx).filter_map(move |state| {
let counter = counter_updates.clone();
let context = context_updates.clone();
let vchan = vchan_updates.clone();
async move {
state.snapshot().and_then(|v| {
if let Ok(Some(rig_id)) =
context.remote_active_rig_id.lock().map(|g| g.clone())
{
vchan.update_primary(
&rig_id,
v.status.freq.hz,
&format!("{:?}", v.status.mode),
);
}
serde_json::to_string(&v).ok().map(|json| {
let json = inject_frontend_meta(
&json,
@@ -256,16 +304,43 @@ pub async fn events(
}
});
// Send a named "ping" event so the JS heartbeat can observe it (SSE
// comments like ": ping" are not exposed by EventSource.onmessage).
// Channel-list change events from the virtual channel manager.
let vchan_change_rx = vchan_mgr.change_tx.subscribe();
let chan_updates = futures_util::stream::unfold(vchan_change_rx, |mut rx| async move {
loop {
match rx.recv().await {
Ok(msg) => {
if let Some(colon) = msg.find(':') {
let rig_id = &msg[..colon];
let channels_json = &msg[colon + 1..];
let payload = format!(
"{{\"rig_id\":\"{rig_id}\",\"channels\":{channels_json}}}"
);
return Some((
Ok::<Bytes, Error>(Bytes::from(format!(
"event: channels\ndata: {payload}\n\n"
))),
rx,
));
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => return None,
}
}
});
// Send a named "ping" event so the JS heartbeat can observe it.
let pings = IntervalStream::new(time::interval(Duration::from_secs(5)))
.map(|_| Ok::<Bytes, Error>(Bytes::from("event: ping\ndata: \n\n")));
// Wrap stream to decrement counter on drop.
let vchan_drop = vchan_mgr.get_ref().clone();
let counter_drop = counter.clone();
let stream = initial_stream.chain(select(pings, updates));
let live = select(select(pings, updates), chan_updates);
let stream = prefix_stream.chain(live);
let stream = DropStream::new(Box::pin(stream), move || {
counter_drop.fetch_sub(1, Ordering::Relaxed);
vchan_drop.release_session(session_id);
});
Ok(HttpResponse::Ok()
@@ -1037,6 +1112,117 @@ pub async fn select_rig(
Ok(HttpResponse::Ok().json(build_rig_list_payload(context.get_ref().as_ref())))
}
// ---------------------------------------------------------------------------
// Virtual channel CRUD
// ---------------------------------------------------------------------------
#[get("/channels/{rig_id}")]
pub async fn list_channels(
path: web::Path<String>,
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
) -> impl Responder {
let rig_id = path.into_inner();
HttpResponse::Ok().json(vchan_mgr.channels(&rig_id))
}
#[derive(serde::Deserialize)]
struct AllocateChannelBody {
session_id: Uuid,
freq_hz: u64,
mode: String,
}
#[post("/channels/{rig_id}")]
pub async fn allocate_channel(
path: web::Path<String>,
body: web::Json<AllocateChannelBody>,
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
) -> impl Responder {
let rig_id = path.into_inner();
match vchan_mgr.allocate(body.session_id, &rig_id, body.freq_hz, &body.mode) {
Ok(ch) => HttpResponse::Ok().json(ch),
Err(e) => HttpResponse::BadRequest().body(e.to_string()),
}
}
#[delete("/channels/{rig_id}/{channel_id}")]
pub async fn delete_channel_route(
path: web::Path<(String, Uuid)>,
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
) -> impl Responder {
let (rig_id, channel_id) = path.into_inner();
match vchan_mgr.delete_channel(&rig_id, channel_id) {
Ok(()) => HttpResponse::Ok().finish(),
Err(crate::server::vchan::VChanClientError::NotFound) => {
HttpResponse::NotFound().finish()
}
Err(crate::server::vchan::VChanClientError::Permanent) => {
HttpResponse::BadRequest().body("cannot remove the primary channel")
}
Err(e) => HttpResponse::BadRequest().body(e.to_string()),
}
}
#[derive(serde::Deserialize)]
struct SubscribeBody {
session_id: Uuid,
}
#[post("/channels/{rig_id}/{channel_id}/subscribe")]
pub async fn subscribe_channel(
path: web::Path<(String, Uuid)>,
body: web::Json<SubscribeBody>,
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
) -> impl Responder {
let (rig_id, channel_id) = path.into_inner();
match vchan_mgr.subscribe_session(body.session_id, &rig_id, channel_id) {
Some(ch) => HttpResponse::Ok().json(ch),
None => HttpResponse::NotFound().finish(),
}
}
#[derive(serde::Deserialize)]
struct SetChanFreqBody {
freq_hz: u64,
}
#[put("/channels/{rig_id}/{channel_id}/freq")]
pub async fn set_vchan_freq(
path: web::Path<(String, Uuid)>,
body: web::Json<SetChanFreqBody>,
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
) -> impl Responder {
let (rig_id, channel_id) = path.into_inner();
match vchan_mgr.set_channel_freq(&rig_id, channel_id, body.freq_hz) {
Ok(()) => HttpResponse::Ok().finish(),
Err(crate::server::vchan::VChanClientError::NotFound) => {
HttpResponse::NotFound().finish()
}
Err(e) => HttpResponse::BadRequest().body(e.to_string()),
}
}
#[derive(serde::Deserialize)]
struct SetChanModeBody {
mode: String,
}
#[put("/channels/{rig_id}/{channel_id}/mode")]
pub async fn set_vchan_mode(
path: web::Path<(String, Uuid)>,
body: web::Json<SetChanModeBody>,
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
) -> impl Responder {
let (rig_id, channel_id) = path.into_inner();
match vchan_mgr.set_channel_mode(&rig_id, channel_id, &body.mode) {
Ok(()) => HttpResponse::Ok().finish(),
Err(crate::server::vchan::VChanClientError::NotFound) => {
HttpResponse::NotFound().finish()
}
Err(e) => HttpResponse::BadRequest().body(e.to_string()),
}
}
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(index)
.service(status_api)
@@ -1104,6 +1290,14 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(cw_js)
.service(bookmarks_js)
.service(scheduler_js)
.service(vchan_js)
// Virtual channels
.service(list_channels)
.service(allocate_channel)
.service(delete_channel_route)
.service(subscribe_channel)
.service(set_vchan_freq)
.service(set_vchan_mode)
// Auth endpoints
.service(crate::server::auth::login)
.service(crate::server::auth::logout)
@@ -1265,6 +1459,16 @@ async fn scheduler_js() -> impl Responder {
.body(status::SCHEDULER_JS)
}
#[get("/vchan.js")]
async fn vchan_js() -> impl Responder {
HttpResponse::Ok()
.insert_header((
header::CONTENT_TYPE,
"application/javascript; charset=utf-8",
))
.body(status::VCHAN_JS)
}
async fn send_command(
rig_tx: &mpsc::Sender<RigRequest>,
cmd: RigCommand,
@@ -437,6 +437,7 @@ impl RouteAccess {
|| path.starts_with("/bookmarks?")
|| path.starts_with("/bookmarks/")
|| path.starts_with("/scheduler/")
|| path.starts_with("/channels/")
{
return Self::Read;
}
@@ -14,6 +14,8 @@ pub mod bookmarks;
pub mod scheduler;
#[path = "status.rs"]
pub mod status;
#[path = "vchan.rs"]
pub mod vchan;
use std::collections::HashMap;
use std::net::SocketAddr;
@@ -36,6 +38,7 @@ use trx_frontend::{FrontendRuntimeContext, FrontendSpawner};
use auth::{AuthConfig, AuthState, SameSite};
use scheduler::{SchedulerStatusMap, SchedulerStore};
use vchan::ClientChannelManager;
/// HTTP frontend implementation.
pub struct HttpFrontend;
@@ -79,7 +82,8 @@ async fn serve(
scheduler_status.clone(),
);
let server = build_server(addr, state_rx, rig_tx, callsign, context, scheduler_store, scheduler_status)?;
let vchan_mgr = Arc::new(ClientChannelManager::new(4));
let server = build_server(addr, state_rx, rig_tx, callsign, context, scheduler_store, scheduler_status, vchan_mgr)?;
let handle = server.handle();
tokio::spawn(async move {
let _ = signal::ctrl_c().await;
@@ -99,6 +103,7 @@ fn build_server(
context: Arc<FrontendRuntimeContext>,
scheduler_store: Arc<SchedulerStore>,
scheduler_status: SchedulerStatusMap,
vchan_mgr: Arc<ClientChannelManager>,
) -> Result<Server, actix_web::Error> {
let state_data = web::Data::new(state_rx);
let rig_tx = web::Data::new(rig_tx);
@@ -111,6 +116,7 @@ fn build_server(
let scheduler_store = web::Data::new(scheduler_store);
let scheduler_status = web::Data::new(scheduler_status);
let vchan_mgr = web::Data::new(vchan_mgr);
// Extract auth config values before moving context
let same_site = match context.http_auth_cookie_same_site.as_str() {
@@ -153,6 +159,7 @@ fn build_server(
.app_data(bookmark_store.clone())
.app_data(scheduler_store.clone())
.app_data(scheduler_status.clone())
.app_data(vchan_mgr.clone())
.wrap(Compress::default())
.wrap(
DefaultHeaders::new()
@@ -21,6 +21,7 @@ pub const WSPR_JS: &str = include_str!("../assets/web/plugins/wspr.js");
pub const CW_JS: &str = include_str!("../assets/web/plugins/cw.js");
pub const BOOKMARKS_JS: &str = include_str!("../assets/web/plugins/bookmarks.js");
pub const SCHEDULER_JS: &str = include_str!("../assets/web/plugins/scheduler.js");
pub const VCHAN_JS: &str = include_str!("../assets/web/plugins/vchan.js");
pub fn index_html() -> String {
INDEX_HTML
@@ -0,0 +1,374 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Client-side virtual channel registry.
//!
//! Each rig has a list of virtual channels tracked entirely within the HTTP
//! frontend process. Channel 0 is permanent and mirrors the rig's current
//! dial frequency. Additional channels are allocated by a tab (identified by
//! its SSE session UUID) and freed when that session disconnects or the tab
//! explicitly deletes them.
//!
//! Actual DSP on the server is unaffected by this registry in Phase 1; the
//! registry is the source of truth for metadata (freq/mode per channel) and
//! drives `SetFreq`/`SetMode` commands to the server when a tab selects or
//! tunes a channel.
use std::collections::HashMap;
use std::sync::RwLock;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use uuid::Uuid;
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/// HTTP-visible snapshot of one channel.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClientChannel {
pub id: Uuid,
/// Position in the ordered list (0 = primary).
pub index: usize,
pub freq_hz: u64,
pub mode: String,
/// True for channel 0 — cannot be deleted.
pub permanent: bool,
/// Number of SSE sessions currently subscribed to this channel.
pub subscribers: usize,
}
#[derive(Debug, Clone)]
pub enum VChanClientError {
/// Channel cap would be exceeded.
CapReached { max: usize },
/// Channel UUID not found.
NotFound,
/// Tried to delete the permanent primary channel.
Permanent,
}
impl std::fmt::Display for VChanClientError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
VChanClientError::CapReached { max } => {
write!(f, "channel cap reached (max {})", max)
}
VChanClientError::NotFound => write!(f, "channel not found"),
VChanClientError::Permanent => write!(f, "cannot remove the primary channel"),
}
}
}
// ---------------------------------------------------------------------------
// Internal record
// ---------------------------------------------------------------------------
struct InternalChannel {
id: Uuid,
freq_hz: u64,
mode: String,
permanent: bool,
/// Session UUIDs currently subscribed to this channel.
session_ids: Vec<Uuid>,
}
// ---------------------------------------------------------------------------
// ClientChannelManager
// ---------------------------------------------------------------------------
/// Per-rig channel registry shared across all actix handlers.
pub struct ClientChannelManager {
/// rig_id → ordered channel list.
rigs: RwLock<HashMap<String, Vec<InternalChannel>>>,
/// session_id → (rig_id, channel_id).
sessions: RwLock<HashMap<Uuid, (String, Uuid)>>,
/// Broadcast used to push updated channel lists to SSE streams.
/// Payload: JSON string (serialised `Vec<ClientChannel>`), prefixed by
/// `"<rig_id>:"` so subscribers can filter by rig.
pub change_tx: broadcast::Sender<String>,
pub max_channels: usize,
}
impl ClientChannelManager {
pub fn new(max_channels: usize) -> Self {
let (change_tx, _) = broadcast::channel(64);
Self {
rigs: RwLock::new(HashMap::new()),
sessions: RwLock::new(HashMap::new()),
change_tx,
max_channels: max_channels.max(1),
}
}
// -- helpers --------------------------------------------------------
fn broadcast_change(&self, rig_id: &str, channels: &[InternalChannel]) {
let list: Vec<ClientChannel> = channels
.iter()
.enumerate()
.map(|(idx, c)| ClientChannel {
id: c.id,
index: idx,
freq_hz: c.freq_hz,
mode: c.mode.clone(),
permanent: c.permanent,
subscribers: c.session_ids.len(),
})
.collect();
if let Ok(json) = serde_json::to_string(&list) {
let _ = self.change_tx.send(format!("{}:{}", rig_id, json));
}
}
// -- public API -------------------------------------------------------
/// Ensure channel 0 exists for `rig_id`. Call this when the SSE stream
/// first delivers rig state so the primary channel reflects the current freq.
pub fn init_rig(&self, rig_id: &str, freq_hz: u64, mode: &str) {
let mut rigs = self.rigs.write().unwrap();
let channels = rigs.entry(rig_id.to_string()).or_default();
if channels.is_empty() {
channels.push(InternalChannel {
id: Uuid::new_v4(),
freq_hz,
mode: mode.to_string(),
permanent: true,
session_ids: Vec::new(),
});
}
}
/// Update channel 0's freq/mode when the server pushes a new rig state.
pub fn update_primary(&self, rig_id: &str, freq_hz: u64, mode: &str) {
let mut rigs = self.rigs.write().unwrap();
if let Some(channels) = rigs.get_mut(rig_id) {
if let Some(ch) = channels.first_mut() {
if ch.freq_hz != freq_hz || ch.mode != mode {
ch.freq_hz = freq_hz;
ch.mode = mode.to_string();
self.broadcast_change(rig_id, channels);
}
}
}
}
/// List all channels for a rig (returns empty vec if rig unknown).
pub fn channels(&self, rig_id: &str) -> Vec<ClientChannel> {
let rigs = self.rigs.read().unwrap();
rigs.get(rig_id)
.map(|chs| {
chs.iter()
.enumerate()
.map(|(idx, c)| ClientChannel {
id: c.id,
index: idx,
freq_hz: c.freq_hz,
mode: c.mode.clone(),
permanent: c.permanent,
subscribers: c.session_ids.len(),
})
.collect()
})
.unwrap_or_default()
}
/// Allocate a new virtual channel for `session_id`.
/// If `session_id` already owns a channel on this rig, it is released first.
/// Returns the new `ClientChannel` snapshot.
pub fn allocate(
&self,
session_id: Uuid,
rig_id: &str,
freq_hz: u64,
mode: &str,
) -> Result<ClientChannel, VChanClientError> {
// Release any existing channel owned by this session on this rig.
self.release_session_on_rig(session_id, rig_id);
let mut rigs = self.rigs.write().unwrap();
let channels = rigs.entry(rig_id.to_string()).or_default();
if channels.len() >= self.max_channels {
return Err(VChanClientError::CapReached {
max: self.max_channels,
});
}
let id = Uuid::new_v4();
let idx = channels.len();
channels.push(InternalChannel {
id,
freq_hz,
mode: mode.to_string(),
permanent: false,
session_ids: vec![session_id],
});
let snapshot = ClientChannel {
id,
index: idx,
freq_hz,
mode: mode.to_string(),
permanent: false,
subscribers: 1,
};
self.broadcast_change(rig_id, channels);
// Update session → channel mapping.
drop(rigs);
self.sessions
.write()
.unwrap()
.insert(session_id, (rig_id.to_string(), id));
Ok(snapshot)
}
/// Subscribe an SSE session to a channel (by channel UUID).
/// Idempotent. Returns `None` if channel not found.
pub fn subscribe_session(
&self,
session_id: Uuid,
rig_id: &str,
channel_id: Uuid,
) -> Option<ClientChannel> {
// Release previous subscription on this rig.
self.release_session_on_rig(session_id, rig_id);
let mut rigs = self.rigs.write().unwrap();
let channels = rigs.get_mut(rig_id)?;
let (idx, ch) = channels
.iter_mut()
.enumerate()
.find(|(_, c)| c.id == channel_id)?;
if !ch.session_ids.contains(&session_id) {
ch.session_ids.push(session_id);
}
let snapshot = ClientChannel {
id: ch.id,
index: idx,
freq_hz: ch.freq_hz,
mode: ch.mode.clone(),
permanent: ch.permanent,
subscribers: ch.session_ids.len(),
};
self.broadcast_change(rig_id, channels);
drop(rigs);
self.sessions
.write()
.unwrap()
.insert(session_id, (rig_id.to_string(), channel_id));
Some(snapshot)
}
/// Release all channel subscriptions for `session_id` across all rigs.
/// Auto-removes non-permanent channels that reach 0 subscribers.
pub fn release_session(&self, session_id: Uuid) {
let mapping = {
let mut sessions = self.sessions.write().unwrap();
sessions.remove(&session_id)
};
if let Some((rig_id, _)) = mapping {
self.release_session_on_rig(session_id, &rig_id);
}
}
fn release_session_on_rig(&self, session_id: Uuid, rig_id: &str) {
let mut rigs = self.rigs.write().unwrap();
let Some(channels) = rigs.get_mut(rig_id) else {
return;
};
let mut changed = false;
for ch in channels.iter_mut() {
if let Some(pos) = ch.session_ids.iter().position(|&s| s == session_id) {
ch.session_ids.remove(pos);
changed = true;
}
}
// Remove non-permanent channels with no subscribers.
let before = channels.len();
channels.retain(|c| c.permanent || !c.session_ids.is_empty());
if channels.len() != before {
changed = true;
}
if changed {
self.broadcast_change(rig_id, channels);
}
}
/// Explicitly delete a channel by UUID (any session may do this).
pub fn delete_channel(
&self,
rig_id: &str,
channel_id: Uuid,
) -> Result<(), VChanClientError> {
let mut rigs = self.rigs.write().unwrap();
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
let pos = channels
.iter()
.position(|c| c.id == channel_id)
.ok_or(VChanClientError::NotFound)?;
if channels[pos].permanent {
return Err(VChanClientError::Permanent);
}
// Collect evicted sessions to clean up the session map.
let evicted: Vec<Uuid> = channels[pos].session_ids.clone();
channels.remove(pos);
self.broadcast_change(rig_id, channels);
drop(rigs);
let mut sessions = self.sessions.write().unwrap();
for sid in evicted {
sessions.remove(&sid);
}
Ok(())
}
/// Update freq/mode metadata for a channel.
pub fn set_channel_freq(
&self,
rig_id: &str,
channel_id: Uuid,
freq_hz: u64,
) -> Result<(), VChanClientError> {
let mut rigs = self.rigs.write().unwrap();
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
let ch = channels
.iter_mut()
.find(|c| c.id == channel_id)
.ok_or(VChanClientError::NotFound)?;
ch.freq_hz = freq_hz;
self.broadcast_change(rig_id, channels);
Ok(())
}
pub fn set_channel_mode(
&self,
rig_id: &str,
channel_id: Uuid,
mode: &str,
) -> Result<(), VChanClientError> {
let mut rigs = self.rigs.write().unwrap();
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
let ch = channels
.iter_mut()
.find(|c| c.id == channel_id)
.ok_or(VChanClientError::NotFound)?;
ch.mode = mode.to_string();
self.broadcast_change(rig_id, channels);
Ok(())
}
/// Return the channel a session is currently subscribed to.
pub fn session_channel(&self, session_id: Uuid) -> Option<(String, Uuid)> {
self.sessions.read().unwrap().get(&session_id).cloned()
}
}