[feat](trx-frontend-http): consume server-side APRS/CW decode via SSE

Add /decode SSE endpoint streaming decoded messages from the server.
Add decode channel OnceLock with set/subscribe pattern.

In the browser, connect to /decode EventSource and dispatch to
onServerAprs/onServerCw handlers.  APRS and CW plugins now receive
server-decoded data automatically while keeping browser-side decoding
as a fallback.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-02-08 22:28:56 +01:00
parent 50e1c44722
commit 998f454a3e
6 changed files with 149 additions and 3 deletions
@@ -1122,6 +1122,30 @@ volWheel(txVolSlider, txVolPct, () => txGainNode, "txVol");
document.getElementById("copyright-year").textContent = new Date().getFullYear();
// --- Server-side decode SSE ---
let decodeSource = null;
let decodeConnected = false;
function connectDecode() {
if (decodeSource) { decodeSource.close(); }
decodeSource = new EventSource("/decode");
decodeSource.onopen = () => { decodeConnected = true; };
decodeSource.onmessage = (evt) => {
try {
const msg = JSON.parse(evt.data);
if (msg.type === "aprs" && window.onServerAprs) window.onServerAprs(msg);
if (msg.type === "cw" && window.onServerCw) window.onServerCw(msg);
} catch (e) {
// ignore parse errors
}
};
decodeSource.onerror = () => {
decodeSource.close();
decodeConnected = false;
setTimeout(connectDecode, 5000);
};
}
connectDecode();
// Release PTT on page unload to prevent stuck transmit
window.addEventListener("beforeunload", () => {
if (txActive) {
@@ -643,7 +643,34 @@ for (let i = aprsPacketHistory.length - 1; i >= 0; i--) {
}
}
// Auto-start APRS if it was running before page refresh
// --- Server-side APRS decode handler ---
window.onServerAprs = function(pkt) {
addAprsPacket({
srcCall: pkt.src_call,
destCall: pkt.dest_call,
path: pkt.path,
info: pkt.info,
type: pkt.packet_type,
crcOk: pkt.crc_ok,
lat: pkt.lat,
lon: pkt.lon,
symbolTable: pkt.symbol_table,
symbolCode: pkt.symbol_code,
});
};
// Update status display based on server decode availability
function updateAprsStatus() {
if (typeof decodeConnected !== "undefined" && decodeConnected) {
if (!aprsActive) {
aprsStatus.textContent = "Server decode active";
aprsToggleBtn.textContent = "Start APRS (browser)";
}
}
}
setInterval(updateAprsStatus, 2000);
// Auto-start APRS if it was running before page refresh (browser fallback)
if (loadSetting("aprsRunning", false) && hasWebCodecs) {
startAprs();
}
@@ -457,3 +457,40 @@ cwToggleBtn.addEventListener("click", startCw);
document.getElementById("cw-clear-btn").addEventListener("click", () => {
cwOutputEl.innerHTML = "";
});
// --- Server-side CW decode handler ---
let cwLastAppendTime = 0;
window.onServerCw = function(evt) {
if (evt.text) {
// Append decoded text to output
const now = Date.now();
if (!cwOutputEl.lastElementChild || now - cwLastAppendTime > 10000 || evt.text === "\n") {
const line = document.createElement("div");
line.className = "cw-line";
cwOutputEl.appendChild(line);
}
cwLastAppendTime = now;
const lastLine = cwOutputEl.lastElementChild;
if (lastLine) {
lastLine.textContent += evt.text;
}
while (cwOutputEl.children.length > CW_MAX_LINES) {
cwOutputEl.removeChild(cwOutputEl.firstChild);
}
cwOutputEl.scrollTop = cwOutputEl.scrollHeight;
}
cwSignalIndicator.className = evt.signal_on ? "cw-signal-on" : "cw-signal-off";
cwWpmInput.value = evt.wpm;
cwToneInput.value = evt.tone_hz;
};
// Update status display based on server decode availability
function updateCwStatus() {
if (typeof decodeConnected !== "undefined" && decodeConnected) {
if (!cwActive) {
cwStatusEl.textContent = "Server decode active";
cwToggleBtn.textContent = "Start CW (browser)";
}
}
}
setInterval(updateCwStatus, 2000);
@@ -9,7 +9,7 @@ use actix_web::{get, post, web, HttpResponse, Responder};
use actix_web::{http::header, Error};
use bytes::Bytes;
use futures_util::stream::{once, select, StreamExt};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::sync::{broadcast, mpsc, oneshot, watch};
use tokio::time::{self, Duration};
use tokio_stream::wrappers::{IntervalStream, WatchStream};
@@ -95,6 +95,41 @@ pub async fn events(
.streaming(stream))
}
#[get("/decode")]
pub async fn decode_events() -> Result<HttpResponse, Error> {
let Some(decode_rx) = crate::server::audio::subscribe_decode() else {
return Ok(HttpResponse::NotFound().body("decode not enabled"));
};
let decode_stream = futures_util::stream::unfold(decode_rx, |mut rx| async move {
loop {
match rx.recv().await {
Ok(msg) => {
if let Ok(json) = serde_json::to_string(&msg) {
return Some((
Ok::<Bytes, Error>(Bytes::from(format!("data: {json}\n\n"))),
rx,
));
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => return None,
}
}
});
let pings = IntervalStream::new(time::interval(Duration::from_secs(15)))
.map(|_| Ok::<Bytes, Error>(Bytes::from(": ping\n\n")));
let stream = select(pings, decode_stream);
Ok(HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "text/event-stream"))
.insert_header((header::CACHE_CONTROL, "no-cache"))
.insert_header((header::CONNECTION, "keep-alive"))
.streaming(stream))
}
/// A stream wrapper that calls a callback when dropped.
struct DropStream<I> {
inner: std::pin::Pin<Box<dyn futures_util::Stream<Item = I> + 'static>>,
@@ -231,6 +266,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(index)
.service(status_api)
.service(events)
.service(decode_events)
.service(toggle_power)
.service(toggle_vfo)
.service(lock_panel)
@@ -18,6 +18,7 @@ use tokio::sync::{broadcast, mpsc, watch};
use tracing::warn;
use trx_core::audio::AudioStreamInfo;
use trx_core::decode::DecodedMessage;
struct AudioChannels {
rx: broadcast::Sender<Bytes>,
@@ -43,6 +44,27 @@ pub fn set_audio_channels(
*ch = Some(AudioChannels { rx, tx, info });
}
fn decode_channel() -> &'static Mutex<Option<broadcast::Sender<DecodedMessage>>> {
static CHANNEL: OnceLock<Mutex<Option<broadcast::Sender<DecodedMessage>>>> = OnceLock::new();
CHANNEL.get_or_init(|| Mutex::new(None))
}
/// Set the decode broadcast channel from the client main.
pub fn set_decode_channel(tx: broadcast::Sender<DecodedMessage>) {
let mut ch = decode_channel()
.lock()
.expect("decode channel mutex poisoned");
*ch = Some(tx);
}
/// Subscribe to the decode broadcast channel, if available.
pub fn subscribe_decode() -> Option<broadcast::Receiver<DecodedMessage>> {
let ch = decode_channel()
.lock()
.expect("decode channel mutex poisoned");
ch.as_ref().map(|tx| tx.subscribe())
}
#[get("/audio")]
pub async fn audio_ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error> {
let channels = audio_channels().lock().expect("audio channels mutex poisoned");
@@ -4,7 +4,7 @@
pub mod server;
pub use server::audio::set_audio_channels;
pub use server::audio::{set_audio_channels, set_decode_channel};
pub fn register_frontend() {
use trx_frontend::FrontendSpawner;