[fix](trx-frontend-http): persist decode history and fix ft8 locators

Co-authored-by: OpenAI <assistant@openai.com>
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-02-09 23:22:35 +01:00
parent 211c3baf16
commit c24d5d0152
3 changed files with 142 additions and 4 deletions
@@ -38,8 +38,8 @@ function renderFt8Message(message) {
while (j < message.length && isAlphaNum(message[j])) j++;
const token = message.slice(i, j);
const grid = token.toUpperCase();
if (/^[A-R]{2}\\d{2}(?:[A-X]{2})?$/.test(grid)) {
out += `<span class="ft8-locator">[${grid}]</span>`;
if (/^[A-R]{2}\d{2}(?:[A-X]{2})?$/.test(grid)) {
out += `<span class="ft8-locator">${grid}</span>`;
} else {
out += escapeHtml(token);
}
@@ -60,7 +60,7 @@ function extractFirstGrid(message) {
while (j < message.length && isAlphaNum(message[j])) j++;
const token = message.slice(i, j);
const grid = token.toUpperCase();
if (/^[A-R]{2}\\d{2}(?:[A-X]{2})?$/.test(grid)) return grid;
if (/^[A-R]{2}\d{2}(?:[A-X]{2})?$/.test(grid)) return grid;
i = j;
} else {
i += 1;
@@ -110,6 +110,11 @@ pub async fn decode_events() -> Result<HttpResponse, Error> {
.into_iter()
.map(trx_core::decode::DecodedMessage::Aprs),
);
out.extend(
crate::server::audio::snapshot_cw_history()
.into_iter()
.map(trx_core::decode::DecodedMessage::Cw),
);
out.extend(
crate::server::audio::snapshot_ft8_history()
.into_iter()
@@ -355,6 +360,7 @@ pub async fn toggle_ft8_decode(
pub async fn clear_ft8_decode(
rig_tx: web::Data<mpsc::Sender<RigRequest>>,
) -> Result<HttpResponse, Error> {
crate::server::audio::clear_ft8_history();
send_command(&rig_tx, RigCommand::ResetFt8Decoder).await
}
@@ -362,6 +368,7 @@ pub async fn clear_ft8_decode(
pub async fn clear_aprs_decode(
rig_tx: web::Data<mpsc::Sender<RigRequest>>,
) -> Result<HttpResponse, Error> {
crate::server::audio::clear_aprs_history();
send_command(&rig_tx, RigCommand::ResetAprsDecoder).await
}
@@ -369,6 +376,7 @@ pub async fn clear_aprs_decode(
pub async fn clear_cw_decode(
rig_tx: web::Data<mpsc::Sender<RigRequest>>,
) -> Result<HttpResponse, Error> {
crate::server::audio::clear_cw_history();
send_command(&rig_tx, RigCommand::ResetCwDecoder).await
}
@@ -9,7 +9,9 @@
//! - Subsequent binary messages: raw Opus packets (RX)
//! - Browser sends binary messages: raw Opus packets (TX)
use std::collections::VecDeque;
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
use actix_web::{get, web, Error, HttpRequest, HttpResponse};
use actix_ws::Message;
@@ -18,7 +20,7 @@ use tokio::sync::{broadcast, mpsc, watch};
use tracing::warn;
use trx_core::audio::AudioStreamInfo;
use trx_core::decode::DecodedMessage;
use trx_core::decode::{AprsPacket, CwEvent, DecodedMessage, Ft8Message};
struct AudioChannels {
rx: broadcast::Sender<Bytes>,
@@ -49,12 +51,108 @@ fn decode_channel() -> &'static Mutex<Option<broadcast::Sender<DecodedMessage>>>
CHANNEL.get_or_init(|| Mutex::new(None))
}
const HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
fn aprs_history() -> &'static Mutex<VecDeque<(Instant, AprsPacket)>> {
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, AprsPacket)>>> = OnceLock::new();
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
}
fn cw_history() -> &'static Mutex<VecDeque<(Instant, CwEvent)>> {
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, CwEvent)>>> = OnceLock::new();
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
}
fn ft8_history() -> &'static Mutex<VecDeque<(Instant, Ft8Message)>> {
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, Ft8Message)>>> = OnceLock::new();
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
}
fn prune_aprs_history(history: &mut VecDeque<(Instant, AprsPacket)>) {
while let Some((ts, _)) = history.front() {
if ts.elapsed() <= HISTORY_RETENTION {
break;
}
history.pop_front();
}
}
fn prune_cw_history(history: &mut VecDeque<(Instant, CwEvent)>) {
while let Some((ts, _)) = history.front() {
if ts.elapsed() <= HISTORY_RETENTION {
break;
}
history.pop_front();
}
}
fn prune_ft8_history(history: &mut VecDeque<(Instant, Ft8Message)>) {
while let Some((ts, _)) = history.front() {
if ts.elapsed() <= HISTORY_RETENTION {
break;
}
history.pop_front();
}
}
fn record_aprs(pkt: AprsPacket) {
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
history.push_back((Instant::now(), pkt));
prune_aprs_history(&mut history);
}
fn record_cw(event: CwEvent) {
let mut history = cw_history().lock().expect("cw history mutex poisoned");
history.push_back((Instant::now(), event));
prune_cw_history(&mut history);
}
fn record_ft8(msg: Ft8Message) {
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
history.push_back((Instant::now(), msg));
prune_ft8_history(&mut history);
}
pub fn snapshot_aprs_history() -> Vec<AprsPacket> {
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
prune_aprs_history(&mut history);
history.iter().map(|(_, pkt)| pkt.clone()).collect()
}
pub fn snapshot_cw_history() -> Vec<CwEvent> {
let mut history = cw_history().lock().expect("cw history mutex poisoned");
prune_cw_history(&mut history);
history.iter().map(|(_, evt)| evt.clone()).collect()
}
pub fn snapshot_ft8_history() -> Vec<Ft8Message> {
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
prune_ft8_history(&mut history);
history.iter().map(|(_, msg)| msg.clone()).collect()
}
pub fn clear_aprs_history() {
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
history.clear();
}
pub fn clear_cw_history() {
let mut history = cw_history().lock().expect("cw history mutex poisoned");
history.clear();
}
pub fn clear_ft8_history() {
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
history.clear();
}
/// Set the decode broadcast channel from the client main.
pub fn set_decode_channel(tx: broadcast::Sender<DecodedMessage>) {
let mut ch = decode_channel()
.lock()
.expect("decode channel mutex poisoned");
*ch = Some(tx);
start_decode_history_collector();
}
/// Subscribe to the decode broadcast channel, if available.
@@ -65,6 +163,38 @@ pub fn subscribe_decode() -> Option<broadcast::Receiver<DecodedMessage>> {
ch.as_ref().map(|tx| tx.subscribe())
}
fn start_decode_history_collector() {
static STARTED: OnceLock<Mutex<bool>> = OnceLock::new();
let started = STARTED.get_or_init(|| Mutex::new(false));
let mut started_guard = started.lock().expect("decode history start mutex poisoned");
if *started_guard {
return;
}
*started_guard = true;
let ch = decode_channel()
.lock()
.expect("decode channel mutex poisoned");
let Some(tx) = ch.as_ref().cloned() else {
return;
};
tokio::spawn(async move {
let mut rx = tx.subscribe();
loop {
match rx.recv().await {
Ok(msg) => match msg {
DecodedMessage::Aprs(pkt) => record_aprs(pkt),
DecodedMessage::Cw(evt) => record_cw(evt),
DecodedMessage::Ft8(msg) => record_ft8(msg),
},
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
}
#[get("/audio")]
pub async fn audio_ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error> {
let channels = audio_channels().lock().expect("audio channels mutex poisoned");