From b7fb9adef71a0991090177348ffcd9e21cfcdc32 Mon Sep 17 00:00:00 2001 From: Stanislaw Grams Date: Thu, 12 Feb 2026 21:18:42 +0100 Subject: [PATCH] [refactor](trx-rs): inject runtime contexts for io paths Phase 3: replace frontend/backend hot-path globals with explicit runtime/registration context wiring while keeping plugin compatibility adapters. Co-authored-by: Codex , Signed-off-by: Stanislaw Grams --- src/trx-client/src/main.rs | 53 +++-- src/trx-client/trx-frontend/src/lib.rs | 20 ++ .../trx-frontend-http-json/src/lib.rs | 9 +- .../trx-frontend-http-json/src/server.rs | 68 ++----- .../trx-frontend/trx-frontend-http/src/api.rs | 40 ++-- .../trx-frontend-http/src/audio.rs | 190 +++++++----------- .../trx-frontend/trx-frontend-http/src/lib.rs | 5 +- .../trx-frontend-http/src/server.rs | 8 +- .../trx-frontend-rigctl/src/lib.rs | 5 + src/trx-server/src/main.rs | 33 +-- src/trx-server/src/rig_task.rs | 9 +- src/trx-server/trx-backend/src/lib.rs | 16 ++ 12 files changed, 220 insertions(+), 236 deletions(-) diff --git a/src/trx-client/src/main.rs b/src/trx-client/src/main.rs index fc8974b..fd1f465 100644 --- a/src/trx-client/src/main.rs +++ b/src/trx-client/src/main.rs @@ -4,7 +4,6 @@ mod audio_client; mod config; -mod plugins; mod remote_client; use std::net::{IpAddr, SocketAddr}; @@ -17,17 +16,17 @@ use tokio::signal; use tokio::sync::{broadcast, mpsc, watch}; use tracing::info; -use trx_app::normalize_name; +use trx_app::{init_logging, load_plugins, normalize_name}; use trx_core::audio::AudioStreamInfo; use trx_core::rig::request::RigRequest; use trx_core::rig::state::RigState; use trx_core::DynResult; -use trx_frontend::{is_frontend_registered, registered_frontends, FrontendRegistrationContext, FrontendRuntimeContext}; +use trx_frontend::{snapshot_bootstrap_context, FrontendRegistrationContext, FrontendRuntimeContext}; use trx_core::decode::DecodedMessage; -use trx_frontend_http::{register_frontend as register_http_frontend, set_audio_channels, set_decode_channel}; -use trx_frontend_http_json::{register_frontend as register_http_json_frontend, set_auth_tokens}; -use trx_frontend_rigctl::register_frontend as register_rigctl_frontend; +use trx_frontend_http::register_frontend_on as register_http_frontend; +use trx_frontend_http_json::register_frontend_on as register_http_json_frontend; +use trx_frontend_rigctl::register_frontend_on as register_rigctl_frontend; use config::ClientConfig; use remote_client::{parse_remote_url, RemoteClientConfig}; @@ -100,17 +99,14 @@ struct AppState; async fn async_init() -> DynResult { use std::sync::Arc; - tracing_subscriber::fmt().with_target(false).init(); - // Phase 3: Create bootstrap context for explicit initialization. // This replaces reliance on global mutable state by threading context through spawn_frontend. - let mut _frontend_reg_ctx = FrontendRegistrationContext::new(); - let frontend_runtime_ctx = Arc::new(FrontendRuntimeContext::new()); + let mut frontend_reg_ctx = FrontendRegistrationContext::new(); + let mut frontend_runtime = FrontendRuntimeContext::new(); - register_http_frontend(); - register_http_json_frontend(); - register_rigctl_frontend(); - let _plugin_libs = plugins::load_plugins(); + register_http_frontend(&mut frontend_reg_ctx); + register_http_json_frontend(&mut frontend_reg_ctx); + register_rigctl_frontend(&mut frontend_reg_ctx); let cli = Cli::parse(); @@ -126,11 +122,24 @@ async fn async_init() -> DynResult { ClientConfig::load_from_default_paths()? }; + init_logging(cfg.general.log_level.as_deref()); + + let _plugin_libs = load_plugins(); + frontend_reg_ctx.extend_from(&snapshot_bootstrap_context()); + if let Some(ref path) = config_path { info!("Loaded configuration from {}", path.display()); } - set_auth_tokens(cfg.frontends.http_json.auth.tokens.clone()); + frontend_runtime.auth_tokens = cfg + .frontends + .http_json + .auth + .tokens + .iter() + .filter(|t| !t.is_empty()) + .cloned() + .collect(); // Resolve remote URL: CLI > config [remote] section > error let remote_url = cli @@ -171,11 +180,11 @@ async fn async_init() -> DynResult { fes }; for name in &frontends { - if !is_frontend_registered(name) { + if !frontend_reg_ctx.is_frontend_registered(name) { return Err(format!( "Unknown frontend: {} (available: {})", name, - registered_frontends().join(", ") + frontend_reg_ctx.registered_frontends().join(", ") ) .into()); } @@ -229,8 +238,10 @@ async fn async_init() -> DynResult { let audio_addr = format!("{}:{}", remote_host, cfg.frontends.audio.server_port); - set_audio_channels(rx_audio_tx.clone(), tx_audio_tx, stream_info_rx); - set_decode_channel(decode_tx.clone()); + frontend_runtime.audio_rx = Some(rx_audio_tx.clone()); + frontend_runtime.audio_tx = Some(tx_audio_tx); + frontend_runtime.audio_info = Some(stream_info_rx); + frontend_runtime.decode_rx = Some(decode_tx.clone()); info!( "Audio enabled: connecting to {}, decode channel set", @@ -248,6 +259,8 @@ async fn async_init() -> DynResult { info!("Audio disabled in config, decode will not be available"); } + let frontend_runtime_ctx = Arc::new(frontend_runtime); + // Spawn frontends with runtime context for frontend in &frontends { let frontend_state_rx = state_rx.clone(); @@ -259,7 +272,7 @@ async fn async_init() -> DynResult { return Err(format!("Frontend missing listen configuration: {}", other).into()); } }; - trx_frontend::spawn_frontend( + frontend_reg_ctx.spawn_frontend( frontend, frontend_state_rx, tx.clone(), diff --git a/src/trx-client/trx-frontend/src/lib.rs b/src/trx-client/trx-frontend/src/lib.rs index cbf7b3c..a7370fe 100644 --- a/src/trx-client/trx-frontend/src/lib.rs +++ b/src/trx-client/trx-frontend/src/lib.rs @@ -4,6 +4,7 @@ use std::collections::{HashMap, VecDeque, HashSet}; use std::net::SocketAddr; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex, OnceLock}; use std::time::Instant; @@ -35,6 +36,7 @@ pub type FrontendSpawnFn = fn( ) -> JoinHandle<()>; /// Context for registering and spawning frontends. +#[derive(Clone)] pub struct FrontendRegistrationContext { spawners: HashMap, } @@ -83,6 +85,13 @@ impl FrontendRegistrationContext { .ok_or_else(|| format!("Unknown frontend: {}", name))?; Ok(spawner(state_rx, rig_tx, callsign, listen_addr, context)) } + + /// Merge another registration context into this one. + pub fn extend_from(&mut self, other: &FrontendRegistrationContext) { + for (name, spawner) in &other.spawners { + self.spawners.insert(name.clone(), *spawner); + } + } } impl Default for FrontendRegistrationContext { @@ -109,6 +118,8 @@ pub struct FrontendRuntimeContext { pub ft8_history: Arc>>, /// Authentication tokens for HTTP-JSON frontend pub auth_tokens: HashSet, + /// Guard to avoid spawning duplicate decode collectors. + pub decode_collector_started: AtomicBool, } impl FrontendRuntimeContext { @@ -123,6 +134,7 @@ impl FrontendRuntimeContext { cw_history: Arc::new(Mutex::new(VecDeque::new())), ft8_history: Arc::new(Mutex::new(VecDeque::new())), auth_tokens: HashSet::new(), + decode_collector_started: AtomicBool::new(false), } } } @@ -146,6 +158,14 @@ fn bootstrap_context() -> &'static Arc> { BOOTSTRAP_CONTEXT.get_or_init(|| Arc::new(Mutex::new(FrontendRegistrationContext::new()))) } +/// Snapshot current plugin/bootstrap registrations into an owned context. +pub fn snapshot_bootstrap_context() -> FrontendRegistrationContext { + let ctx = bootstrap_context() + .lock() + .expect("frontend context mutex poisoned"); + ctx.clone() +} + /// Register a frontend spawner under a stable name (e.g. "http"). /// Plugin compatibility: delegates to bootstrap context. pub fn register_frontend(name: &str, spawner: FrontendSpawnFn) { diff --git a/src/trx-client/trx-frontend/trx-frontend-http-json/src/lib.rs b/src/trx-client/trx-frontend/trx-frontend-http-json/src/lib.rs index 69d0ac8..df798e4 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http-json/src/lib.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http-json/src/lib.rs @@ -4,11 +4,12 @@ pub mod server; +pub fn register_frontend_on(context: &mut trx_frontend::FrontendRegistrationContext) { + use trx_frontend::FrontendSpawner; + context.register_frontend("http-json", server::HttpJsonFrontend::spawn_frontend); +} + pub fn register_frontend() { use trx_frontend::FrontendSpawner; trx_frontend::register_frontend("http-json", server::HttpJsonFrontend::spawn_frontend); } - -pub fn set_auth_tokens(tokens: Vec) { - server::set_auth_tokens(tokens); -} diff --git a/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs b/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs index c218238..0e33968 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http-json/src/server.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: BSD-2-Clause use std::net::SocketAddr; +use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; @@ -10,70 +11,24 @@ use tokio::sync::{mpsc, oneshot, watch}; use tokio::task::JoinHandle; use tracing::{error, info}; -use std::collections::HashSet; -use std::sync::{Mutex, OnceLock}; - use trx_core::rig::request::RigRequest; use trx_core::rig::state::RigState; -use trx_core::{ClientResponse}; -use trx_frontend::FrontendSpawner; +use trx_frontend::{FrontendSpawner, FrontendRuntimeContext}; +use trx_protocol::auth::{SimpleTokenValidator, TokenValidator}; use trx_protocol::codec::parse_envelope; -use trx_protocol::auth::TokenValidator; use trx_protocol::mapping; +use trx_protocol::ClientResponse; /// JSON-over-TCP frontend for control and status. pub struct HttpJsonFrontend; -struct AuthConfig { - tokens: HashSet, -} - -fn auth_registry() -> &'static Mutex { - static REGISTRY: OnceLock> = OnceLock::new(); - REGISTRY.get_or_init(|| { - Mutex::new(AuthConfig { - tokens: HashSet::new(), - }) - }) -} - -pub fn set_auth_tokens(tokens: Vec) { - let mut reg = auth_registry() - .lock() - .expect("http-json auth mutex poisoned"); - reg.tokens = tokens.into_iter().filter(|t| !t.is_empty()).collect(); -} - -/// Token validator that uses the global auth registry. -struct RegistryTokenValidator; - -impl TokenValidator for RegistryTokenValidator { - fn validate(&self, token: &Option) -> Result<(), String> { - let reg = auth_registry() - .lock() - .expect("http-json auth mutex poisoned"); - if reg.tokens.is_empty() { - return Ok(()); - } - let Some(token) = token else { - return Err("missing authorization token".into()); - }; - let candidate = trx_protocol::auth::strip_bearer(token); - if reg.tokens.contains(candidate) { - Ok(()) - } else { - Err("invalid authorization token".into()) - } - } -} - impl FrontendSpawner for HttpJsonFrontend { fn spawn_frontend( _state_rx: watch::Receiver, rig_tx: mpsc::Sender, _callsign: Option, listen_addr: SocketAddr, - context: std::sync::Arc, + context: Arc, ) -> JoinHandle<()> { tokio::spawn(async move { if let Err(e) = serve(listen_addr, rig_tx, context).await { @@ -86,7 +41,7 @@ impl FrontendSpawner for HttpJsonFrontend { async fn serve( listen_addr: SocketAddr, rig_tx: mpsc::Sender, - _context: std::sync::Arc, + context: Arc, ) -> std::io::Result<()> { let listener = TcpListener::bind(listen_addr).await?; info!("json tcp frontend listening on {}", listen_addr); @@ -96,8 +51,9 @@ async fn serve( info!("json tcp client connected: {}", addr); let tx_clone = rig_tx.clone(); + let context = context.clone(); tokio::spawn(async move { - if let Err(e) = handle_client(socket, addr, tx_clone).await { + if let Err(e) = handle_client(socket, addr, tx_clone, context).await { error!("json tcp client {} error: {:?}", addr, e); } }); @@ -108,6 +64,7 @@ async fn handle_client( socket: TcpStream, addr: SocketAddr, tx: mpsc::Sender, + context: Arc, ) -> std::io::Result<()> { let (reader, mut writer) = socket.into_split(); let mut reader = BufReader::new(reader); @@ -143,7 +100,7 @@ async fn handle_client( } }; - if let Err(err) = authorize(&envelope.token) { + if let Err(err) = authorize(&envelope.token, &context) { let resp = ClientResponse { success: false, state: None, @@ -215,6 +172,7 @@ async fn handle_client( Ok(()) } -fn authorize(token: &Option) -> Result<(), String> { - RegistryTokenValidator.validate(token) +fn authorize(token: &Option, context: &FrontendRuntimeContext) -> Result<(), String> { + let validator = SimpleTokenValidator::new(context.auth_tokens.clone()); + validator.validate(token) } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 5bc4239..6d251a0 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -13,9 +13,11 @@ use tokio::sync::{broadcast, mpsc, oneshot, watch}; use tokio::time::{self, Duration}; use tokio_stream::wrappers::{IntervalStream, WatchStream}; +use trx_frontend::FrontendRuntimeContext; +use trx_protocol::{ClientResponse, parse_mode}; use trx_core::radio::freq::Freq; use trx_core::rig::{RigAccessMethod, RigCapabilities, RigInfo}; -use trx_core::{ClientResponse, RigCommand, RigMode, RigRequest, RigSnapshot, RigState}; +use trx_core::{RigCommand, RigRequest, RigSnapshot, RigState}; use crate::server::status; @@ -96,8 +98,10 @@ pub async fn events( } #[get("/decode")] -pub async fn decode_events() -> Result { - let Some(decode_rx) = crate::server::audio::subscribe_decode() else { +pub async fn decode_events( + context: web::Data>, +) -> Result { + let Some(decode_rx) = crate::server::audio::subscribe_decode(context.get_ref()) else { tracing::warn!("/decode requested but decode channel not set (audio disabled?)"); return Ok(HttpResponse::NotFound().body("decode not enabled")); }; @@ -106,17 +110,17 @@ pub async fn decode_events() -> Result { let history = { let mut out = Vec::new(); out.extend( - crate::server::audio::snapshot_aprs_history() + crate::server::audio::snapshot_aprs_history(context.get_ref()) .into_iter() .map(trx_core::decode::DecodedMessage::Aprs), ); out.extend( - crate::server::audio::snapshot_cw_history() + crate::server::audio::snapshot_cw_history(context.get_ref()) .into_iter() .map(trx_core::decode::DecodedMessage::Cw), ); out.extend( - crate::server::audio::snapshot_ft8_history() + crate::server::audio::snapshot_ft8_history(context.get_ref()) .into_iter() .map(trx_core::decode::DecodedMessage::Ft8), ); @@ -358,25 +362,28 @@ pub async fn toggle_ft8_decode( #[post("/clear_ft8_decode")] pub async fn clear_ft8_decode( + context: web::Data>, rig_tx: web::Data>, ) -> Result { - crate::server::audio::clear_ft8_history(); + crate::server::audio::clear_ft8_history(context.get_ref()); send_command(&rig_tx, RigCommand::ResetFt8Decoder).await } #[post("/clear_aprs_decode")] pub async fn clear_aprs_decode( + context: web::Data>, rig_tx: web::Data>, ) -> Result { - crate::server::audio::clear_aprs_history(); + crate::server::audio::clear_aprs_history(context.get_ref()); send_command(&rig_tx, RigCommand::ResetAprsDecoder).await } #[post("/clear_cw_decode")] pub async fn clear_cw_decode( + context: web::Data>, rig_tx: web::Data>, ) -> Result { - crate::server::audio::clear_cw_history(); + crate::server::audio::clear_cw_history(context.get_ref()); send_command(&rig_tx, RigCommand::ResetCwDecoder).await } @@ -575,18 +582,3 @@ impl From for RigInfo { } } } - -fn parse_mode(s: &str) -> RigMode { - match s.to_ascii_uppercase().as_str() { - "LSB" => RigMode::LSB, - "USB" => RigMode::USB, - "CW" => RigMode::CW, - "CWR" => RigMode::CWR, - "AM" => RigMode::AM, - "FM" => RigMode::FM, - "WFM" => RigMode::WFM, - "DIG" | "DIGI" => RigMode::DIG, - "PKT" | "PACKET" => RigMode::PKT, - other => RigMode::Other(other.to_string()), - } -} diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs index 0bf47d9..2987fbe 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/audio.rs @@ -10,64 +10,21 @@ //! - Browser sends binary messages: raw Opus packets (TX) use std::collections::VecDeque; -use std::sync::{Mutex, OnceLock}; +use std::sync::Arc; +use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; -use actix_web::{get, web, Error, HttpRequest, HttpResponse}; +use actix_web::{Error, HttpRequest, HttpResponse, get, web}; use actix_ws::Message; use bytes::Bytes; -use tokio::sync::{broadcast, mpsc, watch}; +use tokio::sync::broadcast; use tracing::warn; -use trx_core::audio::AudioStreamInfo; use trx_core::decode::{AprsPacket, CwEvent, DecodedMessage, Ft8Message}; - -struct AudioChannels { - rx: broadcast::Sender, - tx: mpsc::Sender, - info: watch::Receiver>, -} - -fn audio_channels() -> &'static Mutex> { - static CHANNELS: OnceLock>> = OnceLock::new(); - CHANNELS.get_or_init(|| Mutex::new(None)) -} - -/// Set the audio channels from the client main. Must be called before the -/// HTTP server starts if audio is enabled. -pub fn set_audio_channels( - rx: broadcast::Sender, - tx: mpsc::Sender, - info: watch::Receiver>, -) { - let mut ch = audio_channels() - .lock() - .expect("audio channels mutex poisoned"); - *ch = Some(AudioChannels { rx, tx, info }); -} - -fn decode_channel() -> &'static Mutex>> { - static CHANNEL: OnceLock>>> = OnceLock::new(); - CHANNEL.get_or_init(|| Mutex::new(None)) -} +use trx_frontend::FrontendRuntimeContext; const HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60); -fn aprs_history() -> &'static Mutex> { - static HISTORY: OnceLock>> = OnceLock::new(); - HISTORY.get_or_init(|| Mutex::new(VecDeque::new())) -} - -fn cw_history() -> &'static Mutex> { - static HISTORY: OnceLock>> = OnceLock::new(); - HISTORY.get_or_init(|| Mutex::new(VecDeque::new())) -} - -fn ft8_history() -> &'static Mutex> { - static HISTORY: OnceLock>> = OnceLock::new(); - HISTORY.get_or_init(|| Mutex::new(VecDeque::new())) -} - fn prune_aprs_history(history: &mut VecDeque<(Instant, AprsPacket)>) { while let Some((ts, _)) = history.front() { if ts.elapsed() <= HISTORY_RETENTION { @@ -95,93 +52,98 @@ fn prune_ft8_history(history: &mut VecDeque<(Instant, Ft8Message)>) { } } -fn record_aprs(pkt: AprsPacket) { - let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); +fn record_aprs(context: &FrontendRuntimeContext, pkt: AprsPacket) { + let mut history = context + .aprs_history + .lock() + .expect("aprs history mutex poisoned"); history.push_back((Instant::now(), pkt)); prune_aprs_history(&mut history); } -fn record_cw(event: CwEvent) { - let mut history = cw_history().lock().expect("cw history mutex poisoned"); +fn record_cw(context: &FrontendRuntimeContext, event: CwEvent) { + let mut history = context.cw_history.lock().expect("cw history mutex poisoned"); history.push_back((Instant::now(), event)); prune_cw_history(&mut history); } -fn record_ft8(msg: Ft8Message) { - let mut history = ft8_history().lock().expect("ft8 history mutex poisoned"); +fn record_ft8(context: &FrontendRuntimeContext, msg: Ft8Message) { + let mut history = context + .ft8_history + .lock() + .expect("ft8 history mutex poisoned"); history.push_back((Instant::now(), msg)); prune_ft8_history(&mut history); } -pub fn snapshot_aprs_history() -> Vec { - let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); +pub fn snapshot_aprs_history(context: &FrontendRuntimeContext) -> Vec { + let mut history = context + .aprs_history + .lock() + .expect("aprs history mutex poisoned"); prune_aprs_history(&mut history); history.iter().map(|(_, pkt)| pkt.clone()).collect() } -pub fn snapshot_cw_history() -> Vec { - let mut history = cw_history().lock().expect("cw history mutex poisoned"); +pub fn snapshot_cw_history(context: &FrontendRuntimeContext) -> Vec { + let mut history = context.cw_history.lock().expect("cw history mutex poisoned"); prune_cw_history(&mut history); history.iter().map(|(_, evt)| evt.clone()).collect() } -pub fn snapshot_ft8_history() -> Vec { - let mut history = ft8_history().lock().expect("ft8 history mutex poisoned"); +pub fn snapshot_ft8_history(context: &FrontendRuntimeContext) -> Vec { + let mut history = context + .ft8_history + .lock() + .expect("ft8 history mutex poisoned"); prune_ft8_history(&mut history); history.iter().map(|(_, msg)| msg.clone()).collect() } -pub fn clear_aprs_history() { - let mut history = aprs_history().lock().expect("aprs history mutex poisoned"); - history.clear(); -} - -pub fn clear_cw_history() { - let mut history = cw_history().lock().expect("cw history mutex poisoned"); - history.clear(); -} - -pub fn clear_ft8_history() { - let mut history = ft8_history().lock().expect("ft8 history mutex poisoned"); - history.clear(); -} - -/// Set the decode broadcast channel from the client main. -pub fn set_decode_channel(tx: broadcast::Sender) { - { - let mut ch = decode_channel() - .lock() - .expect("decode channel mutex poisoned"); - *ch = Some(tx.clone()); - } - start_decode_history_collector(tx); -} - -/// Subscribe to the decode broadcast channel, if available. -pub fn subscribe_decode() -> Option> { - let ch = decode_channel() +pub fn clear_aprs_history(context: &FrontendRuntimeContext) { + let mut history = context + .aprs_history .lock() - .expect("decode channel mutex poisoned"); - ch.as_ref().map(|tx| tx.subscribe()) + .expect("aprs history mutex poisoned"); + history.clear(); } -fn start_decode_history_collector(tx: broadcast::Sender) { - static STARTED: OnceLock> = OnceLock::new(); - let started = STARTED.get_or_init(|| Mutex::new(false)); - let mut started_guard = started.lock().expect("decode history start mutex poisoned"); - if *started_guard { +pub fn clear_cw_history(context: &FrontendRuntimeContext) { + let mut history = context.cw_history.lock().expect("cw history mutex poisoned"); + history.clear(); +} + +pub fn clear_ft8_history(context: &FrontendRuntimeContext) { + let mut history = context + .ft8_history + .lock() + .expect("ft8 history mutex poisoned"); + history.clear(); +} + +pub fn subscribe_decode( + context: &FrontendRuntimeContext, +) -> Option> { + context.decode_rx.as_ref().map(|tx| tx.subscribe()) +} + +pub fn start_decode_history_collector(context: Arc) { + if context.decode_collector_started.swap(true, Ordering::AcqRel) { return; } - *started_guard = true; + + let Some(tx) = context.decode_rx.as_ref().cloned() else { + return; + }; tokio::spawn(async move { let mut rx = tx.subscribe(); loop { match rx.recv().await { Ok(msg) => match msg { - DecodedMessage::Aprs(pkt) => record_aprs(pkt), - DecodedMessage::Cw(evt) => record_cw(evt), - DecodedMessage::Ft8(msg) => record_ft8(msg), + DecodedMessage::Aprs(pkt) => record_aprs(&context, pkt), + DecodedMessage::Cw(evt) => record_cw(&context, evt), + DecodedMessage::Ft8(msg) => record_ft8(&context, msg), }, Err(broadcast::error::RecvError::Lagged(_)) => continue, Err(broadcast::error::RecvError::Closed) => break, @@ -191,27 +153,31 @@ fn start_decode_history_collector(tx: broadcast::Sender) { } #[get("/audio")] -pub async fn audio_ws(req: HttpRequest, body: web::Payload) -> Result { - let channels = audio_channels().lock().expect("audio channels mutex poisoned"); - let Some(ref ch) = *channels else { +pub async fn audio_ws( + req: HttpRequest, + body: web::Payload, + context: web::Data>, +) -> Result { + let Some(rx) = context.audio_rx.as_ref() else { + return Ok(HttpResponse::NotFound().body("audio not enabled")); + }; + let Some(tx_sender) = context.audio_tx.as_ref().cloned() else { + return Ok(HttpResponse::NotFound().body("audio not enabled")); + }; + let Some(mut info_rx) = context.audio_info.as_ref().cloned() else { return Ok(HttpResponse::NotFound().body("audio not enabled")); }; - // Plain GET probe (no WebSocket upgrade) — return 204 to signal audio is available + // Plain GET probe (no WebSocket upgrade) - return 204 to signal audio is available. if !req.headers().contains_key("upgrade") { return Ok(HttpResponse::NoContent().finish()); } - let mut rx_sub = ch.rx.subscribe(); - let tx_sender = ch.tx.clone(); - let mut info_rx = ch.info.clone(); - drop(channels); + let mut rx_sub = rx.subscribe(); let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; - // Spawn the WebSocket handler actix_web::rt::spawn(async move { - // Wait for stream info and send as first text message let info = loop { if let Some(info) = info_rx.borrow().clone() { break info; @@ -233,7 +199,6 @@ pub async fn audio_ws(req: HttpRequest, body: web::Payload) -> Result Result { - let _ = tx_sender.send(data).await; + let _ = tx_sender.send(Bytes::from(data.to_vec())).await; } Message::Close(_) => break, _ => {} diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/lib.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/lib.rs index 5f4d10f..77cd9fc 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/lib.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/lib.rs @@ -4,7 +4,10 @@ pub mod server; -pub use server::audio::{set_audio_channels, set_decode_channel}; +pub fn register_frontend_on(context: &mut trx_frontend::FrontendRegistrationContext) { + use trx_frontend::FrontendSpawner; + context.register_frontend("http", server::HttpFrontend::spawn_frontend); +} pub fn register_frontend() { use trx_frontend::FrontendSpawner; diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs index cef6e64..10e299d 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/server.rs @@ -48,9 +48,10 @@ async fn serve( state_rx: watch::Receiver, rig_tx: mpsc::Sender, callsign: Option, - _context: Arc, + context: Arc, ) -> Result<(), actix_web::Error> { - let server = build_server(addr, state_rx, rig_tx, callsign)?; + audio::start_decode_history_collector(context.clone()); + let server = build_server(addr, state_rx, rig_tx, callsign, context)?; let handle = server.handle(); tokio::spawn(async move { let _ = signal::ctrl_c().await; @@ -67,16 +68,19 @@ fn build_server( state_rx: watch::Receiver, rig_tx: mpsc::Sender, _callsign: Option, + context: Arc, ) -> Result { let state_data = web::Data::new(state_rx); let rig_tx = web::Data::new(rig_tx); let clients = web::Data::new(Arc::new(AtomicUsize::new(0))); + let context_data = web::Data::new(context); let server = HttpServer::new(move || { App::new() .app_data(state_data.clone()) .app_data(rig_tx.clone()) .app_data(clients.clone()) + .app_data(context_data.clone()) .configure(api::configure) }) .shutdown_timeout(1) diff --git a/src/trx-client/trx-frontend/trx-frontend-rigctl/src/lib.rs b/src/trx-client/trx-frontend/trx-frontend-rigctl/src/lib.rs index cadb289..62744c8 100644 --- a/src/trx-client/trx-frontend/trx-frontend-rigctl/src/lib.rs +++ b/src/trx-client/trx-frontend/trx-frontend-rigctl/src/lib.rs @@ -4,6 +4,11 @@ pub mod server; +pub fn register_frontend_on(context: &mut trx_frontend::FrontendRegistrationContext) { + use trx_frontend::FrontendSpawner; + context.register_frontend("rigctl", server::RigctlFrontend::spawn_frontend); +} + pub fn register_frontend() { use trx_frontend::FrontendSpawner; trx_frontend::register_frontend("rigctl", server::RigctlFrontend::spawn_frontend); diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 2dc579a..18047d1 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -7,7 +7,6 @@ mod config; mod decode; mod error; mod listener; -mod plugins; mod rig_task; use std::collections::HashSet; @@ -23,10 +22,9 @@ use tracing::{error, info}; use trx_core::audio::AudioStreamInfo; -use trx_app::normalize_name; +use trx_app::{init_logging, load_plugins, normalize_name}; use trx_backend::{ - is_backend_registered, register_builtin_backends, register_builtin_backends_on, - registered_backends, RegistrationContext, RigAccess, + register_builtin_backends_on, snapshot_bootstrap_context, RegistrationContext, RigAccess, }; use trx_core::rig::controller::{AdaptivePolling, ExponentialBackoff}; use trx_core::rig::request::RigRequest; @@ -107,7 +105,11 @@ struct ResolvedConfig { longitude: Option, } -fn resolve_config(cli: &Cli, cfg: &ServerConfig) -> DynResult { +fn resolve_config( + cli: &Cli, + cfg: &ServerConfig, + registry: &RegistrationContext, +) -> DynResult { let rig_str = cli.rig.clone().or_else(|| cfg.rig.model.clone()); let rig = match rig_str.as_deref() { Some(name) => normalize_name(name), @@ -117,11 +119,11 @@ fn resolve_config(cli: &Cli, cfg: &ServerConfig) -> DynResult { ) } }; - if !is_backend_registered(&rig) { + if !registry.is_backend_registered(&rig) { return Err(format!( "Unknown rig model: {} (available: {})", rig, - registered_backends().join(", ") + registry.registered_backends().join(", ") ) .into()); } @@ -186,8 +188,10 @@ fn resolve_config(cli: &Cli, cfg: &ServerConfig) -> DynResult { fn build_rig_task_config( resolved: &ResolvedConfig, cfg: &ServerConfig, + registry: std::sync::Arc, ) -> rig_task::RigTaskConfig { rig_task::RigTaskConfig { + registry, rig_model: resolved.rig.clone(), access: resolved.access.clone(), polling: AdaptivePolling::new( @@ -210,18 +214,12 @@ fn build_rig_task_config( #[tokio::main] async fn main() -> DynResult<()> { - tracing_subscriber::fmt().with_target(false).init(); - // Phase 3B: Create bootstrap context for explicit initialization. // This replaces reliance on global mutable state, though currently // built-in backends still register on globals for plugin compatibility. // Full de-globalization would require threading context through rig_task and listener. let mut bootstrap_ctx = RegistrationContext::new(); register_builtin_backends_on(&mut bootstrap_ctx); - info!("Bootstrap context initialized with {} backends", bootstrap_ctx.registered_backends().len()); - - register_builtin_backends(); - let _plugin_libs = plugins::load_plugins(); let cli = Cli::parse(); @@ -237,11 +235,16 @@ async fn main() -> DynResult<()> { ServerConfig::load_from_default_paths()? }; + init_logging(cfg.general.log_level.as_deref()); + + let _plugin_libs = load_plugins(); + bootstrap_ctx.extend_from(&snapshot_bootstrap_context()); + if let Some(ref path) = config_path { info!("Loaded configuration from {}", path.display()); } - let resolved = resolve_config(&cli, &cfg)?; + let resolved = resolve_config(&cli, &cfg, &bootstrap_ctx)?; match &resolved.access { RigAccess::Serial { path, baud } => { @@ -275,7 +278,7 @@ async fn main() -> DynResult<()> { // Keep receivers alive so channels don't close prematurely let _state_rx = state_rx; - let rig_task_config = build_rig_task_config(&resolved, &cfg); + let rig_task_config = build_rig_task_config(&resolved, &cfg, std::sync::Arc::new(bootstrap_ctx)); let _rig_handle = tokio::spawn(rig_task::run_rig_task(rig_task_config, rx, state_tx)); if cfg.listen.enabled { diff --git a/src/trx-server/src/rig_task.rs b/src/trx-server/src/rig_task.rs index ab1b3fe..3b20143 100644 --- a/src/trx-server/src/rig_task.rs +++ b/src/trx-server/src/rig_task.rs @@ -5,12 +5,13 @@ //! Rig task implementation using controller components. use std::time::Duration; +use std::sync::Arc; use tokio::sync::{mpsc, watch}; use tokio::time::{self, Instant}; use tracing::{debug, error, info, warn}; -use trx_backend::{build_rig, RigAccess}; +use trx_backend::{RegistrationContext, RigAccess}; use trx_core::radio::freq::Freq; use trx_core::rig::command::RigCommand; use trx_core::rig::controller::{ @@ -28,6 +29,7 @@ use crate::error::is_invalid_bcd_error; /// Configuration for the rig task. pub struct RigTaskConfig { + pub registry: Arc, pub rig_model: String, pub access: RigAccess, pub polling: AdaptivePolling, @@ -42,7 +44,10 @@ pub struct RigTaskConfig { impl Default for RigTaskConfig { fn default() -> Self { + let mut registry = RegistrationContext::new(); + trx_backend::register_builtin_backends_on(&mut registry); Self { + registry: Arc::new(registry), rig_model: "ft817".to_string(), access: RigAccess::Serial { path: "/dev/ttyUSB0".to_string(), @@ -83,7 +88,7 @@ pub async fn run_rig_task( RigAccess::Tcp { addr } => info!("TCP CAT: {}", addr), } - let mut rig: Box = build_rig(&config.rig_model, config.access)?; + let mut rig: Box = config.registry.build_rig(&config.rig_model, config.access)?; info!("Rig backend ready"); // Initialize state machine and state diff --git a/src/trx-server/trx-backend/src/lib.rs b/src/trx-server/trx-backend/src/lib.rs index 2931871..b21df51 100644 --- a/src/trx-server/trx-backend/src/lib.rs +++ b/src/trx-server/trx-backend/src/lib.rs @@ -25,6 +25,7 @@ pub enum RigAccess { pub type BackendFactory = fn(RigAccess) -> DynResult>; /// Context for registering and instantiating rig backends. +#[derive(Clone)] pub struct RegistrationContext { factories: HashMap, } @@ -65,6 +66,13 @@ impl RegistrationContext { .ok_or_else(|| format!("Unknown rig backend: {}", name))?; factory(access) } + + /// Merge another registration context into this one. + pub fn extend_from(&mut self, other: &RegistrationContext) { + for (name, factory) in &other.factories { + self.factories.insert(name.clone(), *factory); + } + } } impl Default for RegistrationContext { @@ -86,6 +94,14 @@ fn bootstrap_context() -> &'static Arc> { BOOTSTRAP_CONTEXT.get_or_init(|| Arc::new(Mutex::new(RegistrationContext::new()))) } +/// Snapshot current plugin/bootstrap registrations into an owned context. +pub fn snapshot_bootstrap_context() -> RegistrationContext { + let ctx = bootstrap_context() + .lock() + .expect("backend context mutex poisoned"); + ctx.clone() +} + /// Register a backend factory under a stable name (e.g. "ft817"). /// Plugin compatibility: delegates to bootstrap context. pub fn register_backend(name: &str, factory: BackendFactory) {