[feat](trx-client): support virtual channel bandwidth control

Add client-side command plumbing, HTTP endpoint handling, and frontend interception so bandwidth changes are applied per active virtual channel and survive reconnects.

Co-authored-by: OpenAI Codex <codex@openai.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-03-11 21:41:32 +01:00
parent 717228a635
commit daa0631b35
7 changed files with 150 additions and 12 deletions
+32 -8
View File
@@ -26,9 +26,9 @@ use trx_core::audio::{
write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE,
AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE,
AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_RX_FRAME_CH, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_RX_FRAME_CH,
AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_BW,
AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE,
AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE,
}; };
use trx_core::decode::DecodedMessage; use trx_core::decode::DecodedMessage;
use trx_frontend::VChanAudioCmd; use trx_frontend::VChanAudioCmd;
@@ -52,8 +52,9 @@ pub async fn run_audio_client(
) { ) {
let mut reconnect_delay = Duration::from_secs(1); let mut reconnect_delay = Duration::from_secs(1);
// Active virtual-channel subscriptions, keyed by UUID. // Active virtual-channel subscriptions, keyed by UUID.
// Re-sent to the server on every audio TCP reconnect. // Tuple: (freq_hz, mode, bandwidth_hz) — re-sent to the server on every audio TCP reconnect.
let mut active_subs: HashMap<Uuid, (u64, String)> = HashMap::new(); // bandwidth_hz == 0 means "use mode default".
let mut active_subs: HashMap<Uuid, (u64, String, u32)> = HashMap::new();
loop { loop {
if *shutdown_rx.borrow() { if *shutdown_rx.borrow() {
@@ -138,7 +139,7 @@ async fn handle_audio_connection(
shutdown_rx: &mut watch::Receiver<bool>, shutdown_rx: &mut watch::Receiver<bool>,
vchan_audio: &Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>, vchan_audio: &Arc<RwLock<HashMap<Uuid, broadcast::Sender<Bytes>>>>,
vchan_cmd_rx: &mut mpsc::Receiver<VChanAudioCmd>, vchan_cmd_rx: &mut mpsc::Receiver<VChanAudioCmd>,
active_subs: &mut HashMap<Uuid, (u64, String)>, active_subs: &mut HashMap<Uuid, (u64, String, u32)>,
vchan_destroyed_tx: &Option<broadcast::Sender<Uuid>>, vchan_destroyed_tx: &Option<broadcast::Sender<Uuid>>,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let (reader, writer) = stream.into_split(); let (reader, writer) = stream.into_split();
@@ -165,7 +166,7 @@ async fn handle_audio_connection(
// Track which UUIDs were pre-sent so we don't duplicate them when the // Track which UUIDs were pre-sent so we don't duplicate them when the
// same Subscribe command arrives from the mpsc queue. // same Subscribe command arrives from the mpsc queue.
let mut resubscribed: HashSet<Uuid> = HashSet::new(); let mut resubscribed: HashSet<Uuid> = HashSet::new();
for (&uuid, &(freq_hz, ref mode)) in active_subs.iter() { for (&uuid, &(freq_hz, ref mode, bandwidth_hz)) in active_subs.iter() {
let json = serde_json::json!({ let json = serde_json::json!({
"uuid": uuid.to_string(), "uuid": uuid.to_string(),
"freq_hz": freq_hz, "freq_hz": freq_hz,
@@ -177,6 +178,16 @@ async fn handle_audio_connection(
return Err(e); return Err(e);
} }
} }
// Re-apply non-default bandwidth after re-subscribing.
if bandwidth_hz > 0 {
let bw_json = serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": bandwidth_hz });
if let Ok(payload) = serde_json::to_vec(&bw_json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await {
warn!("Audio vchan reconnect BW write failed: {}", e);
return Err(e);
}
}
}
resubscribed.insert(uuid); resubscribed.insert(uuid);
} }
@@ -306,7 +317,7 @@ async fn handle_audio_connection(
cmd = vchan_cmd_rx.recv() => { cmd = vchan_cmd_rx.recv() => {
match cmd { match cmd {
Some(VChanAudioCmd::Subscribe { uuid, freq_hz, mode }) => { Some(VChanAudioCmd::Subscribe { uuid, freq_hz, mode }) => {
active_subs.insert(uuid, (freq_hz, mode.clone())); active_subs.insert(uuid, (freq_hz, mode.clone(), 0));
// Skip if already re-sent during reconnect initialization. // Skip if already re-sent during reconnect initialization.
if resubscribed.remove(&uuid) { if resubscribed.remove(&uuid) {
// Already sent above; don't duplicate. // Already sent above; don't duplicate.
@@ -359,6 +370,19 @@ async fn handle_audio_connection(
} }
} }
} }
Some(VChanAudioCmd::SetBandwidth { uuid, bandwidth_hz }) => {
// Persist for reconnect.
if let Some(entry) = active_subs.get_mut(&uuid) {
entry.2 = bandwidth_hz;
}
let json = serde_json::json!({ "uuid": uuid.to_string(), "bandwidth_hz": bandwidth_hz });
if let Ok(payload) = serde_json::to_vec(&json) {
if let Err(e) = write_audio_msg(&mut writer, AUDIO_MSG_VCHAN_BW, &payload).await {
warn!("Audio vchan BW write failed: {}", e);
break;
}
}
}
None => {} None => {}
} }
} }
+2
View File
@@ -38,6 +38,8 @@ pub enum VChanAudioCmd {
SetFreq { uuid: Uuid, freq_hz: u64 }, SetFreq { uuid: Uuid, freq_hz: u64 },
/// Update the demodulation mode of an existing virtual channel. /// Update the demodulation mode of an existing virtual channel.
SetMode { uuid: Uuid, mode: String }, SetMode { uuid: Uuid, mode: String },
/// Update the audio filter bandwidth of an existing virtual channel.
SetBandwidth { uuid: Uuid, bandwidth_hz: u32 },
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@@ -3278,6 +3278,7 @@ async function applyBandwidthFromInput() {
syncBandwidthInput(clamped); syncBandwidthInput(clamped);
if (lastSpectrumData) scheduleSpectrumDraw(); if (lastSpectrumData) scheduleSpectrumDraw();
try { try {
if (typeof vchanInterceptBandwidth === "function" && await vchanInterceptBandwidth(clamped)) return;
await postPath(`/set_bandwidth?hz=${clamped}`); await postPath(`/set_bandwidth?hz=${clamped}`);
if (Number.isFinite(lastFreqHz)) { if (Number.isFinite(lastFreqHz)) {
await ensureTunedBandwidthCoverage(lastFreqHz); await ensureTunedBandwidthCoverage(lastFreqHz);
@@ -3352,6 +3353,7 @@ async function applyAutoBandwidth() {
syncBandwidthInput(estimated); syncBandwidthInput(estimated);
if (lastSpectrumData) scheduleSpectrumDraw(); if (lastSpectrumData) scheduleSpectrumDraw();
try { try {
if (typeof vchanInterceptBandwidth === "function" && await vchanInterceptBandwidth(estimated)) return;
await postPath(`/set_bandwidth?hz=${estimated}`); await postPath(`/set_bandwidth?hz=${estimated}`);
if (Number.isFinite(lastFreqHz)) { if (Number.isFinite(lastFreqHz)) {
await ensureTunedBandwidthCoverage(lastFreqHz); await ensureTunedBandwidthCoverage(lastFreqHz);
@@ -7774,10 +7776,13 @@ if (spectrumCanvas || overviewCanvas) {
window.addEventListener("mouseup", async () => { window.addEventListener("mouseup", async () => {
if (_bwDragEdge) { if (_bwDragEdge) {
try { try {
await postPath(`/set_bandwidth?hz=${Math.round(currentBandwidthHz)}`); const bwHz = Math.round(currentBandwidthHz);
if (!(typeof vchanInterceptBandwidth === "function" && await vchanInterceptBandwidth(bwHz))) {
await postPath(`/set_bandwidth?hz=${bwHz}`);
if (Number.isFinite(lastFreqHz)) { if (Number.isFinite(lastFreqHz)) {
await ensureTunedBandwidthCoverage(lastFreqHz, currentBandwidthHz); await ensureTunedBandwidthCoverage(lastFreqHz, currentBandwidthHz);
} }
}
} catch (_) {} } catch (_) {}
_bwDragEdge = null; _bwDragEdge = null;
_bwDragCanvas = null; _bwDragCanvas = null;
@@ -286,10 +286,17 @@ async function bmApply(bm) {
modeEl.value = String(bm.mode || "").toUpperCase(); modeEl.value = String(bm.mode || "").toUpperCase();
} }
if (bm.bandwidth_hz) { if (bm.bandwidth_hz) {
const bwHandledByVchan = typeof vchanInterceptBandwidth === "function"
&& await vchanInterceptBandwidth(bm.bandwidth_hz);
if (!bwHandledByVchan) {
await postPath("/set_bandwidth?hz=" + bm.bandwidth_hz); await postPath("/set_bandwidth?hz=" + bm.bandwidth_hz);
}
if (typeof currentBandwidthHz !== "undefined") { if (typeof currentBandwidthHz !== "undefined") {
currentBandwidthHz = bm.bandwidth_hz; currentBandwidthHz = bm.bandwidth_hz;
} }
if (typeof window !== "undefined") {
window.currentBandwidthHz = bm.bandwidth_hz;
}
if (typeof syncBandwidthInput === "function") { if (typeof syncBandwidthInput === "function") {
syncBandwidthInput(bm.bandwidth_hz); syncBandwidthInput(bm.bandwidth_hz);
} }
@@ -236,6 +236,29 @@ function vchanSyncModeDisplay() {
// When on primary channel, app.js rig-state updates handle the picker. // When on primary channel, app.js rig-state updates handle the picker.
} }
// Sync the BW input to the active virtual channel's bandwidth.
function vchanSyncBwDisplay() {
if (!vchanIsOnVirtual()) return;
const ch = vchanActiveChannel();
if (!ch) return;
const bwEl = document.getElementById("spectrum-bw-input");
if (!bwEl) return;
// bandwidth_hz == 0 means mode-default; derive it from the channel mode.
let bwHz = ch.bandwidth_hz || 0;
if (bwHz === 0 && typeof mwDefaultsForMode === "function") {
bwHz = mwDefaultsForMode(ch.mode)[0] || 0;
}
if (bwHz > 0) {
bwEl.value = (bwHz / 1000).toFixed(3).replace(/\.?0+$/, "");
if (typeof currentBandwidthHz !== "undefined") {
currentBandwidthHz = bwHz;
window.currentBandwidthHz = bwHz;
} else {
window.currentBandwidthHz = bwHz;
}
}
}
// Add / remove the vchan accent class from the freq and BW inputs. // Add / remove the vchan accent class from the freq and BW inputs.
function vchanSyncAccentUI() { function vchanSyncAccentUI() {
const onVirtual = vchanIsOnVirtual(); const onVirtual = vchanIsOnVirtual();
@@ -246,6 +269,7 @@ function vchanSyncAccentUI() {
if (onVirtual) { if (onVirtual) {
vchanUpdateFreqDisplay(); vchanUpdateFreqDisplay();
vchanSyncModeDisplay(); vchanSyncModeDisplay();
vchanSyncBwDisplay();
} else if (typeof _origRefreshFreqDisplay === "function") { } else if (typeof _origRefreshFreqDisplay === "function") {
_origRefreshFreqDisplay(); _origRefreshFreqDisplay();
} }
@@ -286,6 +310,23 @@ async function vchanSetChannelFreq(freqHz) {
} }
} }
async function vchanSetChannelBandwidth(bwHz) {
if (!vchanRigId || !vchanActiveId) return;
try {
const resp = await fetch(
`/channels/${encodeURIComponent(vchanRigId)}/${encodeURIComponent(vchanActiveId)}/bw`,
{
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ bandwidth_hz: Math.round(bwHz) }),
}
);
if (!resp.ok) console.warn("vchan: set bw failed", resp.status);
} catch (e) {
console.error("vchan: set bw error", e);
}
}
async function vchanSetChannelMode(mode) { async function vchanSetChannelMode(mode) {
if (!vchanRigId || !vchanActiveId) return; if (!vchanRigId || !vchanActiveId) return;
try { try {
@@ -312,6 +353,14 @@ window.vchanInterceptMode = async function(mode) {
return true; return true;
}; };
// Called by app.js bandwidth setters before sending /set_bandwidth to the
// server. Returns true if the change was handled by the virtual channel.
window.vchanInterceptBandwidth = async function(bwHz) {
if (!vchanIsOnVirtual()) return false;
await vchanSetChannelBandwidth(bwHz);
return true;
};
// Wrap setRigFrequency (defined in app.js, loaded before this file) so that // Wrap setRigFrequency (defined in app.js, loaded before this file) so that
// frequency changes are redirected to the active virtual channel instead of // frequency changes are redirected to the active virtual channel instead of
// the server when on a non-primary channel. // the server when on a non-primary channel.
@@ -1202,6 +1202,27 @@ pub async fn set_vchan_freq(
} }
} }
#[derive(serde::Deserialize)]
struct SetChanBwBody {
bandwidth_hz: u32,
}
#[put("/channels/{rig_id}/{channel_id}/bw")]
pub async fn set_vchan_bw(
path: web::Path<(String, Uuid)>,
body: web::Json<SetChanBwBody>,
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
) -> impl Responder {
let (rig_id, channel_id) = path.into_inner();
match vchan_mgr.set_channel_bandwidth(&rig_id, channel_id, body.bandwidth_hz) {
Ok(()) => HttpResponse::Ok().finish(),
Err(crate::server::vchan::VChanClientError::NotFound) => {
HttpResponse::NotFound().finish()
}
Err(e) => HttpResponse::BadRequest().body(e.to_string()),
}
}
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
struct SetChanModeBody { struct SetChanModeBody {
mode: String, mode: String,
@@ -1297,6 +1318,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(delete_channel_route) .service(delete_channel_route)
.service(subscribe_channel) .service(subscribe_channel)
.service(set_vchan_freq) .service(set_vchan_freq)
.service(set_vchan_bw)
.service(set_vchan_mode) .service(set_vchan_mode)
// Auth endpoints // Auth endpoints
.service(crate::server::auth::login) .service(crate::server::auth::login)
@@ -36,6 +36,8 @@ pub struct ClientChannel {
pub index: usize, pub index: usize,
pub freq_hz: u64, pub freq_hz: u64,
pub mode: String, pub mode: String,
/// Audio filter bandwidth in Hz (0 = mode default).
pub bandwidth_hz: u32,
/// True for channel 0 — cannot be deleted. /// True for channel 0 — cannot be deleted.
pub permanent: bool, pub permanent: bool,
/// Number of SSE sessions currently subscribed to this channel. /// Number of SSE sessions currently subscribed to this channel.
@@ -72,6 +74,8 @@ struct InternalChannel {
id: Uuid, id: Uuid,
freq_hz: u64, freq_hz: u64,
mode: String, mode: String,
/// Audio filter bandwidth in Hz (0 = mode default).
bandwidth_hz: u32,
permanent: bool, permanent: bool,
/// Session UUIDs currently subscribed to this channel. /// Session UUIDs currently subscribed to this channel.
session_ids: Vec<Uuid>, session_ids: Vec<Uuid>,
@@ -132,6 +136,7 @@ impl ClientChannelManager {
index: idx, index: idx,
freq_hz: c.freq_hz, freq_hz: c.freq_hz,
mode: c.mode.clone(), mode: c.mode.clone(),
bandwidth_hz: c.bandwidth_hz,
permanent: c.permanent, permanent: c.permanent,
subscribers: c.session_ids.len(), subscribers: c.session_ids.len(),
}) })
@@ -153,6 +158,7 @@ impl ClientChannelManager {
id: Uuid::new_v4(), id: Uuid::new_v4(),
freq_hz, freq_hz,
mode: mode.to_string(), mode: mode.to_string(),
bandwidth_hz: 0,
permanent: true, permanent: true,
session_ids: Vec::new(), session_ids: Vec::new(),
}); });
@@ -185,6 +191,7 @@ impl ClientChannelManager {
index: idx, index: idx,
freq_hz: c.freq_hz, freq_hz: c.freq_hz,
mode: c.mode.clone(), mode: c.mode.clone(),
bandwidth_hz: c.bandwidth_hz,
permanent: c.permanent, permanent: c.permanent,
subscribers: c.session_ids.len(), subscribers: c.session_ids.len(),
}) })
@@ -218,6 +225,7 @@ impl ClientChannelManager {
id, id,
freq_hz, freq_hz,
mode: mode.to_string(), mode: mode.to_string(),
bandwidth_hz: 0,
permanent: false, permanent: false,
session_ids: vec![session_id], session_ids: vec![session_id],
}); });
@@ -227,6 +235,7 @@ impl ClientChannelManager {
index: idx, index: idx,
freq_hz, freq_hz,
mode: mode.to_string(), mode: mode.to_string(),
bandwidth_hz: 0,
permanent: false, permanent: false,
subscribers: 1, subscribers: 1,
}; };
@@ -276,6 +285,7 @@ impl ClientChannelManager {
index: idx, index: idx,
freq_hz: ch.freq_hz, freq_hz: ch.freq_hz,
mode: ch.mode.clone(), mode: ch.mode.clone(),
bandwidth_hz: ch.bandwidth_hz,
permanent: ch.permanent, permanent: ch.permanent,
subscribers: ch.session_ids.len(), subscribers: ch.session_ids.len(),
}; };
@@ -427,6 +437,25 @@ impl ClientChannelManager {
Ok(()) Ok(())
} }
pub fn set_channel_bandwidth(
&self,
rig_id: &str,
channel_id: Uuid,
bandwidth_hz: u32,
) -> Result<(), VChanClientError> {
let mut rigs = self.rigs.write().unwrap();
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
let ch = channels
.iter_mut()
.find(|c| c.id == channel_id)
.ok_or(VChanClientError::NotFound)?;
ch.bandwidth_hz = bandwidth_hz;
self.broadcast_change(rig_id, channels);
drop(rigs);
self.send_audio_cmd(VChanAudioCmd::SetBandwidth { uuid: channel_id, bandwidth_hz });
Ok(())
}
/// Return the channel a session is currently subscribed to. /// Return the channel a session is currently subscribed to.
pub fn session_channel(&self, session_id: Uuid) -> Option<(String, Uuid)> { pub fn session_channel(&self, session_id: Uuid) -> Option<(String, Uuid)> {
self.sessions.read().unwrap().get(&session_id).cloned() self.sessions.read().unwrap().get(&session_id).cloned()