[refactor](trx-rs): inject runtime contexts for io paths

Phase 3: replace frontend/backend hot-path globals with explicit runtime/registration context wiring while keeping plugin compatibility adapters.

Co-authored-by: Codex <codex@openai.com>,
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-02-12 21:18:42 +01:00
parent 410fc89185
commit b7fb9adef7
12 changed files with 220 additions and 236 deletions
@@ -13,9 +13,11 @@ use tokio::sync::{broadcast, mpsc, oneshot, watch};
use tokio::time::{self, Duration};
use tokio_stream::wrappers::{IntervalStream, WatchStream};
use trx_frontend::FrontendRuntimeContext;
use trx_protocol::{ClientResponse, parse_mode};
use trx_core::radio::freq::Freq;
use trx_core::rig::{RigAccessMethod, RigCapabilities, RigInfo};
use trx_core::{ClientResponse, RigCommand, RigMode, RigRequest, RigSnapshot, RigState};
use trx_core::{RigCommand, RigRequest, RigSnapshot, RigState};
use crate::server::status;
@@ -96,8 +98,10 @@ pub async fn events(
}
#[get("/decode")]
pub async fn decode_events() -> Result<HttpResponse, Error> {
let Some(decode_rx) = crate::server::audio::subscribe_decode() else {
pub async fn decode_events(
context: web::Data<Arc<FrontendRuntimeContext>>,
) -> Result<HttpResponse, Error> {
let Some(decode_rx) = crate::server::audio::subscribe_decode(context.get_ref()) else {
tracing::warn!("/decode requested but decode channel not set (audio disabled?)");
return Ok(HttpResponse::NotFound().body("decode not enabled"));
};
@@ -106,17 +110,17 @@ pub async fn decode_events() -> Result<HttpResponse, Error> {
let history = {
let mut out = Vec::new();
out.extend(
crate::server::audio::snapshot_aprs_history()
crate::server::audio::snapshot_aprs_history(context.get_ref())
.into_iter()
.map(trx_core::decode::DecodedMessage::Aprs),
);
out.extend(
crate::server::audio::snapshot_cw_history()
crate::server::audio::snapshot_cw_history(context.get_ref())
.into_iter()
.map(trx_core::decode::DecodedMessage::Cw),
);
out.extend(
crate::server::audio::snapshot_ft8_history()
crate::server::audio::snapshot_ft8_history(context.get_ref())
.into_iter()
.map(trx_core::decode::DecodedMessage::Ft8),
);
@@ -358,25 +362,28 @@ pub async fn toggle_ft8_decode(
#[post("/clear_ft8_decode")]
pub async fn clear_ft8_decode(
context: web::Data<Arc<FrontendRuntimeContext>>,
rig_tx: web::Data<mpsc::Sender<RigRequest>>,
) -> Result<HttpResponse, Error> {
crate::server::audio::clear_ft8_history();
crate::server::audio::clear_ft8_history(context.get_ref());
send_command(&rig_tx, RigCommand::ResetFt8Decoder).await
}
#[post("/clear_aprs_decode")]
pub async fn clear_aprs_decode(
context: web::Data<Arc<FrontendRuntimeContext>>,
rig_tx: web::Data<mpsc::Sender<RigRequest>>,
) -> Result<HttpResponse, Error> {
crate::server::audio::clear_aprs_history();
crate::server::audio::clear_aprs_history(context.get_ref());
send_command(&rig_tx, RigCommand::ResetAprsDecoder).await
}
#[post("/clear_cw_decode")]
pub async fn clear_cw_decode(
context: web::Data<Arc<FrontendRuntimeContext>>,
rig_tx: web::Data<mpsc::Sender<RigRequest>>,
) -> Result<HttpResponse, Error> {
crate::server::audio::clear_cw_history();
crate::server::audio::clear_cw_history(context.get_ref());
send_command(&rig_tx, RigCommand::ResetCwDecoder).await
}
@@ -575,18 +582,3 @@ impl From<RigInfoPlaceholder> for RigInfo {
}
}
}
fn parse_mode(s: &str) -> RigMode {
match s.to_ascii_uppercase().as_str() {
"LSB" => RigMode::LSB,
"USB" => RigMode::USB,
"CW" => RigMode::CW,
"CWR" => RigMode::CWR,
"AM" => RigMode::AM,
"FM" => RigMode::FM,
"WFM" => RigMode::WFM,
"DIG" | "DIGI" => RigMode::DIG,
"PKT" | "PACKET" => RigMode::PKT,
other => RigMode::Other(other.to_string()),
}
}
@@ -10,64 +10,21 @@
//! - Browser sends binary messages: raw Opus packets (TX)
use std::collections::VecDeque;
use std::sync::{Mutex, OnceLock};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
use actix_web::{get, web, Error, HttpRequest, HttpResponse};
use actix_web::{Error, HttpRequest, HttpResponse, get, web};
use actix_ws::Message;
use bytes::Bytes;
use tokio::sync::{broadcast, mpsc, watch};
use tokio::sync::broadcast;
use tracing::warn;
use trx_core::audio::AudioStreamInfo;
use trx_core::decode::{AprsPacket, CwEvent, DecodedMessage, Ft8Message};
struct AudioChannels {
rx: broadcast::Sender<Bytes>,
tx: mpsc::Sender<Bytes>,
info: watch::Receiver<Option<AudioStreamInfo>>,
}
fn audio_channels() -> &'static Mutex<Option<AudioChannels>> {
static CHANNELS: OnceLock<Mutex<Option<AudioChannels>>> = OnceLock::new();
CHANNELS.get_or_init(|| Mutex::new(None))
}
/// Set the audio channels from the client main. Must be called before the
/// HTTP server starts if audio is enabled.
pub fn set_audio_channels(
rx: broadcast::Sender<Bytes>,
tx: mpsc::Sender<Bytes>,
info: watch::Receiver<Option<AudioStreamInfo>>,
) {
let mut ch = audio_channels()
.lock()
.expect("audio channels mutex poisoned");
*ch = Some(AudioChannels { rx, tx, info });
}
fn decode_channel() -> &'static Mutex<Option<broadcast::Sender<DecodedMessage>>> {
static CHANNEL: OnceLock<Mutex<Option<broadcast::Sender<DecodedMessage>>>> = OnceLock::new();
CHANNEL.get_or_init(|| Mutex::new(None))
}
use trx_frontend::FrontendRuntimeContext;
const HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
fn aprs_history() -> &'static Mutex<VecDeque<(Instant, AprsPacket)>> {
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, AprsPacket)>>> = OnceLock::new();
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
}
fn cw_history() -> &'static Mutex<VecDeque<(Instant, CwEvent)>> {
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, CwEvent)>>> = OnceLock::new();
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
}
fn ft8_history() -> &'static Mutex<VecDeque<(Instant, Ft8Message)>> {
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, Ft8Message)>>> = OnceLock::new();
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
}
fn prune_aprs_history(history: &mut VecDeque<(Instant, AprsPacket)>) {
while let Some((ts, _)) = history.front() {
if ts.elapsed() <= HISTORY_RETENTION {
@@ -95,93 +52,98 @@ fn prune_ft8_history(history: &mut VecDeque<(Instant, Ft8Message)>) {
}
}
fn record_aprs(pkt: AprsPacket) {
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
fn record_aprs(context: &FrontendRuntimeContext, pkt: AprsPacket) {
let mut history = context
.aprs_history
.lock()
.expect("aprs history mutex poisoned");
history.push_back((Instant::now(), pkt));
prune_aprs_history(&mut history);
}
fn record_cw(event: CwEvent) {
let mut history = cw_history().lock().expect("cw history mutex poisoned");
fn record_cw(context: &FrontendRuntimeContext, event: CwEvent) {
let mut history = context.cw_history.lock().expect("cw history mutex poisoned");
history.push_back((Instant::now(), event));
prune_cw_history(&mut history);
}
fn record_ft8(msg: Ft8Message) {
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
fn record_ft8(context: &FrontendRuntimeContext, msg: Ft8Message) {
let mut history = context
.ft8_history
.lock()
.expect("ft8 history mutex poisoned");
history.push_back((Instant::now(), msg));
prune_ft8_history(&mut history);
}
pub fn snapshot_aprs_history() -> Vec<AprsPacket> {
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
pub fn snapshot_aprs_history(context: &FrontendRuntimeContext) -> Vec<AprsPacket> {
let mut history = context
.aprs_history
.lock()
.expect("aprs history mutex poisoned");
prune_aprs_history(&mut history);
history.iter().map(|(_, pkt)| pkt.clone()).collect()
}
pub fn snapshot_cw_history() -> Vec<CwEvent> {
let mut history = cw_history().lock().expect("cw history mutex poisoned");
pub fn snapshot_cw_history(context: &FrontendRuntimeContext) -> Vec<CwEvent> {
let mut history = context.cw_history.lock().expect("cw history mutex poisoned");
prune_cw_history(&mut history);
history.iter().map(|(_, evt)| evt.clone()).collect()
}
pub fn snapshot_ft8_history() -> Vec<Ft8Message> {
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
pub fn snapshot_ft8_history(context: &FrontendRuntimeContext) -> Vec<Ft8Message> {
let mut history = context
.ft8_history
.lock()
.expect("ft8 history mutex poisoned");
prune_ft8_history(&mut history);
history.iter().map(|(_, msg)| msg.clone()).collect()
}
pub fn clear_aprs_history() {
let mut history = aprs_history().lock().expect("aprs history mutex poisoned");
history.clear();
}
pub fn clear_cw_history() {
let mut history = cw_history().lock().expect("cw history mutex poisoned");
history.clear();
}
pub fn clear_ft8_history() {
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
history.clear();
}
/// Set the decode broadcast channel from the client main.
pub fn set_decode_channel(tx: broadcast::Sender<DecodedMessage>) {
{
let mut ch = decode_channel()
.lock()
.expect("decode channel mutex poisoned");
*ch = Some(tx.clone());
}
start_decode_history_collector(tx);
}
/// Subscribe to the decode broadcast channel, if available.
pub fn subscribe_decode() -> Option<broadcast::Receiver<DecodedMessage>> {
let ch = decode_channel()
pub fn clear_aprs_history(context: &FrontendRuntimeContext) {
let mut history = context
.aprs_history
.lock()
.expect("decode channel mutex poisoned");
ch.as_ref().map(|tx| tx.subscribe())
.expect("aprs history mutex poisoned");
history.clear();
}
fn start_decode_history_collector(tx: broadcast::Sender<DecodedMessage>) {
static STARTED: OnceLock<Mutex<bool>> = OnceLock::new();
let started = STARTED.get_or_init(|| Mutex::new(false));
let mut started_guard = started.lock().expect("decode history start mutex poisoned");
if *started_guard {
pub fn clear_cw_history(context: &FrontendRuntimeContext) {
let mut history = context.cw_history.lock().expect("cw history mutex poisoned");
history.clear();
}
pub fn clear_ft8_history(context: &FrontendRuntimeContext) {
let mut history = context
.ft8_history
.lock()
.expect("ft8 history mutex poisoned");
history.clear();
}
pub fn subscribe_decode(
context: &FrontendRuntimeContext,
) -> Option<broadcast::Receiver<DecodedMessage>> {
context.decode_rx.as_ref().map(|tx| tx.subscribe())
}
pub fn start_decode_history_collector(context: Arc<FrontendRuntimeContext>) {
if context.decode_collector_started.swap(true, Ordering::AcqRel) {
return;
}
*started_guard = true;
let Some(tx) = context.decode_rx.as_ref().cloned() else {
return;
};
tokio::spawn(async move {
let mut rx = tx.subscribe();
loop {
match rx.recv().await {
Ok(msg) => match msg {
DecodedMessage::Aprs(pkt) => record_aprs(pkt),
DecodedMessage::Cw(evt) => record_cw(evt),
DecodedMessage::Ft8(msg) => record_ft8(msg),
DecodedMessage::Aprs(pkt) => record_aprs(&context, pkt),
DecodedMessage::Cw(evt) => record_cw(&context, evt),
DecodedMessage::Ft8(msg) => record_ft8(&context, msg),
},
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
@@ -191,27 +153,31 @@ fn start_decode_history_collector(tx: broadcast::Sender<DecodedMessage>) {
}
#[get("/audio")]
pub async fn audio_ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error> {
let channels = audio_channels().lock().expect("audio channels mutex poisoned");
let Some(ref ch) = *channels else {
pub async fn audio_ws(
req: HttpRequest,
body: web::Payload,
context: web::Data<Arc<FrontendRuntimeContext>>,
) -> Result<HttpResponse, Error> {
let Some(rx) = context.audio_rx.as_ref() else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
};
let Some(tx_sender) = context.audio_tx.as_ref().cloned() else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
};
let Some(mut info_rx) = context.audio_info.as_ref().cloned() else {
return Ok(HttpResponse::NotFound().body("audio not enabled"));
};
// Plain GET probe (no WebSocket upgrade) return 204 to signal audio is available
// Plain GET probe (no WebSocket upgrade) - return 204 to signal audio is available.
if !req.headers().contains_key("upgrade") {
return Ok(HttpResponse::NoContent().finish());
}
let mut rx_sub = ch.rx.subscribe();
let tx_sender = ch.tx.clone();
let mut info_rx = ch.info.clone();
drop(channels);
let mut rx_sub = rx.subscribe();
let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
// Spawn the WebSocket handler
actix_web::rt::spawn(async move {
// Wait for stream info and send as first text message
let info = loop {
if let Some(info) = info_rx.borrow().clone() {
break info;
@@ -233,7 +199,6 @@ pub async fn audio_ws(req: HttpRequest, body: web::Payload) -> Result<HttpRespon
return;
}
// Spawn RX forwarding task
let mut rx_session = session.clone();
let rx_handle = actix_web::rt::spawn(async move {
loop {
@@ -251,11 +216,10 @@ pub async fn audio_ws(req: HttpRequest, body: web::Payload) -> Result<HttpRespon
}
});
// Read TX frames from browser
while let Some(Ok(msg)) = msg_stream.recv().await {
match msg {
Message::Binary(data) => {
let _ = tx_sender.send(data).await;
let _ = tx_sender.send(Bytes::from(data.to_vec())).await;
}
Message::Close(_) => break,
_ => {}
@@ -4,7 +4,10 @@
pub mod server;
pub use server::audio::{set_audio_channels, set_decode_channel};
pub fn register_frontend_on(context: &mut trx_frontend::FrontendRegistrationContext) {
use trx_frontend::FrontendSpawner;
context.register_frontend("http", server::HttpFrontend::spawn_frontend);
}
pub fn register_frontend() {
use trx_frontend::FrontendSpawner;
@@ -48,9 +48,10 @@ async fn serve(
state_rx: watch::Receiver<RigState>,
rig_tx: mpsc::Sender<RigRequest>,
callsign: Option<String>,
_context: Arc<FrontendRuntimeContext>,
context: Arc<FrontendRuntimeContext>,
) -> Result<(), actix_web::Error> {
let server = build_server(addr, state_rx, rig_tx, callsign)?;
audio::start_decode_history_collector(context.clone());
let server = build_server(addr, state_rx, rig_tx, callsign, context)?;
let handle = server.handle();
tokio::spawn(async move {
let _ = signal::ctrl_c().await;
@@ -67,16 +68,19 @@ fn build_server(
state_rx: watch::Receiver<RigState>,
rig_tx: mpsc::Sender<RigRequest>,
_callsign: Option<String>,
context: Arc<FrontendRuntimeContext>,
) -> Result<Server, actix_web::Error> {
let state_data = web::Data::new(state_rx);
let rig_tx = web::Data::new(rig_tx);
let clients = web::Data::new(Arc::new(AtomicUsize::new(0)));
let context_data = web::Data::new(context);
let server = HttpServer::new(move || {
App::new()
.app_data(state_data.clone())
.app_data(rig_tx.clone())
.app_data(clients.clone())
.app_data(context_data.clone())
.configure(api::configure)
})
.shutdown_timeout(1)