From 998f454a3eb896c59f71b622f1897d3434b6423a Mon Sep 17 00:00:00 2001 From: Stanislaw Grams Date: Sun, 8 Feb 2026 22:28:56 +0100 Subject: [PATCH] [feat](trx-frontend-http): consume server-side APRS/CW decode via SSE Add /decode SSE endpoint streaming decoded messages from the server. Add decode channel OnceLock with set/subscribe pattern. In the browser, connect to /decode EventSource and dispatch to onServerAprs/onServerCw handlers. APRS and CW plugins now receive server-decoded data automatically while keeping browser-side decoding as a fallback. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Stanislaw Grams --- .../trx-frontend-http/assets/web/app.js | 24 ++++++++++++ .../assets/web/plugins/aprs.js | 29 +++++++++++++- .../assets/web/plugins/cw.js | 37 ++++++++++++++++++ .../trx-frontend/trx-frontend-http/src/api.rs | 38 ++++++++++++++++++- .../trx-frontend-http/src/audio.rs | 22 +++++++++++ .../trx-frontend/trx-frontend-http/src/lib.rs | 2 +- 6 files changed, 149 insertions(+), 3 deletions(-) diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 8e6547d..6234ef3 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -1122,6 +1122,30 @@ volWheel(txVolSlider, txVolPct, () => txGainNode, "txVol"); document.getElementById("copyright-year").textContent = new Date().getFullYear(); +// --- Server-side decode SSE --- +let decodeSource = null; +let decodeConnected = false; +function connectDecode() { + if (decodeSource) { decodeSource.close(); } + decodeSource = new EventSource("/decode"); + decodeSource.onopen = () => { decodeConnected = true; }; + decodeSource.onmessage = (evt) => { + try { + const msg = JSON.parse(evt.data); + if (msg.type === "aprs" && window.onServerAprs) window.onServerAprs(msg); + if (msg.type === "cw" && window.onServerCw) window.onServerCw(msg); + } catch (e) { + // ignore parse errors + } + }; + decodeSource.onerror = () => { + decodeSource.close(); + decodeConnected = false; + setTimeout(connectDecode, 5000); + }; +} +connectDecode(); + // Release PTT on page unload to prevent stuck transmit window.addEventListener("beforeunload", () => { if (txActive) { diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/aprs.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/aprs.js index adeda69..a7ee0ba 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/aprs.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/aprs.js @@ -643,7 +643,34 @@ for (let i = aprsPacketHistory.length - 1; i >= 0; i--) { } } -// Auto-start APRS if it was running before page refresh +// --- Server-side APRS decode handler --- +window.onServerAprs = function(pkt) { + addAprsPacket({ + srcCall: pkt.src_call, + destCall: pkt.dest_call, + path: pkt.path, + info: pkt.info, + type: pkt.packet_type, + crcOk: pkt.crc_ok, + lat: pkt.lat, + lon: pkt.lon, + symbolTable: pkt.symbol_table, + symbolCode: pkt.symbol_code, + }); +}; + +// Update status display based on server decode availability +function updateAprsStatus() { + if (typeof decodeConnected !== "undefined" && decodeConnected) { + if (!aprsActive) { + aprsStatus.textContent = "Server decode active"; + aprsToggleBtn.textContent = "Start APRS (browser)"; + } + } +} +setInterval(updateAprsStatus, 2000); + +// Auto-start APRS if it was running before page refresh (browser fallback) if (loadSetting("aprsRunning", false) && hasWebCodecs) { startAprs(); } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/cw.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/cw.js index bba38fe..ad64099 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/cw.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/plugins/cw.js @@ -457,3 +457,40 @@ cwToggleBtn.addEventListener("click", startCw); document.getElementById("cw-clear-btn").addEventListener("click", () => { cwOutputEl.innerHTML = ""; }); + +// --- Server-side CW decode handler --- +let cwLastAppendTime = 0; +window.onServerCw = function(evt) { + if (evt.text) { + // Append decoded text to output + const now = Date.now(); + if (!cwOutputEl.lastElementChild || now - cwLastAppendTime > 10000 || evt.text === "\n") { + const line = document.createElement("div"); + line.className = "cw-line"; + cwOutputEl.appendChild(line); + } + cwLastAppendTime = now; + const lastLine = cwOutputEl.lastElementChild; + if (lastLine) { + lastLine.textContent += evt.text; + } + while (cwOutputEl.children.length > CW_MAX_LINES) { + cwOutputEl.removeChild(cwOutputEl.firstChild); + } + cwOutputEl.scrollTop = cwOutputEl.scrollHeight; + } + cwSignalIndicator.className = evt.signal_on ? "cw-signal-on" : "cw-signal-off"; + cwWpmInput.value = evt.wpm; + cwToneInput.value = evt.tone_hz; +}; + +// Update status display based on server decode availability +function updateCwStatus() { + if (typeof decodeConnected !== "undefined" && decodeConnected) { + if (!cwActive) { + cwStatusEl.textContent = "Server decode active"; + cwToggleBtn.textContent = "Start CW (browser)"; + } + } +} +setInterval(updateCwStatus, 2000); diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index e02bbc0..683775b 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -9,7 +9,7 @@ use actix_web::{get, post, web, HttpResponse, Responder}; use actix_web::{http::header, Error}; use bytes::Bytes; use futures_util::stream::{once, select, StreamExt}; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::sync::{broadcast, mpsc, oneshot, watch}; use tokio::time::{self, Duration}; use tokio_stream::wrappers::{IntervalStream, WatchStream}; @@ -95,6 +95,41 @@ pub async fn events( .streaming(stream)) } +#[get("/decode")] +pub async fn decode_events() -> Result { + let Some(decode_rx) = crate::server::audio::subscribe_decode() else { + return Ok(HttpResponse::NotFound().body("decode not enabled")); + }; + + let decode_stream = futures_util::stream::unfold(decode_rx, |mut rx| async move { + loop { + match rx.recv().await { + Ok(msg) => { + if let Ok(json) = serde_json::to_string(&msg) { + return Some(( + Ok::(Bytes::from(format!("data: {json}\n\n"))), + rx, + )); + } + } + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => return None, + } + } + }); + + let pings = IntervalStream::new(time::interval(Duration::from_secs(15))) + .map(|_| Ok::(Bytes::from(": ping\n\n"))); + + let stream = select(pings, decode_stream); + + Ok(HttpResponse::Ok() + .insert_header((header::CONTENT_TYPE, "text/event-stream")) + .insert_header((header::CACHE_CONTROL, "no-cache")) + .insert_header((header::CONNECTION, "keep-alive")) + .streaming(stream)) +} + /// A stream wrapper that calls a callback when dropped. struct DropStream { inner: std::pin::Pin + 'static>>, @@ -231,6 +266,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(index) .service(status_api) .service(events) + .service(decode_events) .service(toggle_power) .service(toggle_vfo) .service(lock_panel) diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs index 6c1d493..a65b3c7 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs @@ -18,6 +18,7 @@ use tokio::sync::{broadcast, mpsc, watch}; use tracing::warn; use trx_core::audio::AudioStreamInfo; +use trx_core::decode::DecodedMessage; struct AudioChannels { rx: broadcast::Sender, @@ -43,6 +44,27 @@ pub fn set_audio_channels( *ch = Some(AudioChannels { rx, tx, info }); } +fn decode_channel() -> &'static Mutex>> { + static CHANNEL: OnceLock>>> = OnceLock::new(); + CHANNEL.get_or_init(|| Mutex::new(None)) +} + +/// Set the decode broadcast channel from the client main. +pub fn set_decode_channel(tx: broadcast::Sender) { + let mut ch = decode_channel() + .lock() + .expect("decode channel mutex poisoned"); + *ch = Some(tx); +} + +/// Subscribe to the decode broadcast channel, if available. +pub fn subscribe_decode() -> Option> { + let ch = decode_channel() + .lock() + .expect("decode channel mutex poisoned"); + ch.as_ref().map(|tx| tx.subscribe()) +} + #[get("/audio")] pub async fn audio_ws(req: HttpRequest, body: web::Payload) -> Result { let channels = audio_channels().lock().expect("audio channels mutex poisoned"); diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/lib.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/lib.rs index 27b715c..5f4d10f 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/lib.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/lib.rs @@ -4,7 +4,7 @@ pub mod server; -pub use server::audio::set_audio_channels; +pub use server::audio::{set_audio_channels, set_decode_channel}; pub fn register_frontend() { use trx_frontend::FrontendSpawner;