[refactor](workspace): complete remaining architecture phases

Bundle all pending repository updates, including plugin context de-globalization, runtime hardening, config validation, boundary tests, and supporting docs/scripts.

Co-authored-by: OpenAI Codex <codex@openai.com>
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-02-12 22:27:36 +01:00
parent 144afbae8e
commit 4b34a39745
27 changed files with 684 additions and 210 deletions
+5 -6
View File
@@ -8,6 +8,7 @@ mod remote_client;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::ptr::NonNull;
use std::time::Duration;
use bytes::Bytes;
@@ -17,16 +18,14 @@ use tokio::sync::{broadcast, mpsc, watch};
use tokio::task::JoinHandle;
use tracing::{error, info};
use trx_app::{init_logging, load_plugins, normalize_name};
use trx_app::{init_logging, load_frontend_plugins, normalize_name};
use trx_core::audio::AudioStreamInfo;
use trx_core::decode::DecodedMessage;
use trx_core::rig::request::RigRequest;
use trx_core::rig::state::RigState;
use trx_core::DynResult;
use trx_frontend::{
snapshot_bootstrap_context, FrontendRegistrationContext, FrontendRuntimeContext,
};
use trx_frontend::{FrontendRegistrationContext, FrontendRuntimeContext};
use trx_frontend_http::register_frontend_on as register_http_frontend;
use trx_frontend_http_json::register_frontend_on as register_http_json_frontend;
use trx_frontend_rigctl::register_frontend_on as register_rigctl_frontend;
@@ -142,8 +141,8 @@ async fn async_init() -> DynResult<AppState> {
init_logging(cfg.general.log_level.as_deref());
let _plugin_libs = load_plugins();
frontend_reg_ctx.extend_from(&snapshot_bootstrap_context());
let frontend_ctx_ptr = NonNull::from(&mut frontend_reg_ctx).cast();
let _plugin_libs = load_frontend_plugins(frontend_ctx_ptr);
if let Some(ref path) = config_path {
info!("Loaded configuration from {}", path.display());
+139 -1
View File
@@ -309,7 +309,18 @@ fn parse_port(port_str: &str) -> Result<u16, String> {
#[cfg(test)]
mod tests {
use super::{parse_remote_url, RemoteEndpoint};
use super::{parse_remote_url, RemoteClientConfig, RemoteEndpoint};
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::sync::{mpsc, watch};
use trx_core::radio::freq::{Band, Freq};
use trx_core::rig::state::RigSnapshot;
use trx_core::rig::{RigAccessMethod, RigCapabilities, RigInfo, RigStatus, RigTxStatus};
use trx_core::{RigMode, RigState};
use trx_protocol::ClientResponse;
#[test]
fn parse_host_default_port() {
@@ -352,4 +363,131 @@ mod tests {
let err = parse_remote_url("::1:7000").expect_err("must fail");
assert!(err.contains("must be bracketed"));
}
fn sample_snapshot() -> RigSnapshot {
RigSnapshot {
info: RigInfo {
manufacturer: "Test".to_string(),
model: "Dummy".to_string(),
revision: "1".to_string(),
capabilities: RigCapabilities {
supported_bands: vec![Band {
low_hz: 7_000_000,
high_hz: 7_200_000,
tx_allowed: true,
}],
supported_modes: vec![RigMode::USB],
num_vfos: 1,
lock: false,
lockable: true,
attenuator: false,
preamp: false,
rit: false,
rpt: false,
split: false,
},
access: RigAccessMethod::Tcp {
addr: "127.0.0.1:1234".to_string(),
},
},
status: RigStatus {
freq: Freq { hz: 7_100_000 },
mode: RigMode::USB,
tx_en: false,
vfo: None,
tx: Some(RigTxStatus {
power: None,
limit: None,
swr: None,
alc: None,
}),
rx: None,
lock: Some(false),
},
band: None,
enabled: Some(true),
initialized: true,
server_callsign: Some("N0CALL".to_string()),
server_version: Some("test".to_string()),
server_latitude: None,
server_longitude: None,
aprs_decode_enabled: false,
cw_decode_enabled: false,
ft8_decode_enabled: false,
cw_auto: true,
cw_wpm: 15,
cw_tone_hz: 700,
}
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn reconnects_and_updates_state_after_drop() {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("local addr");
let response = serde_json::to_string(&ClientResponse {
success: true,
state: Some(sample_snapshot()),
error: None,
})
.expect("serialize response")
+ "\n";
let server = tokio::spawn(async move {
let (first, _) = listener.accept().await.expect("accept first");
let (first_reader, _) = first.into_split();
let mut first_reader = BufReader::new(first_reader);
let mut buf = String::new();
let _ = first_reader.read_line(&mut buf).await.expect("read first");
let (second, _) = listener.accept().await.expect("accept second");
let (second_reader, mut second_writer) = second.into_split();
let mut second_reader = BufReader::new(second_reader);
buf.clear();
let _ = second_reader
.read_line(&mut buf)
.await
.expect("read second");
second_writer
.write_all(response.as_bytes())
.await
.expect("write response");
second_writer.flush().await.expect("flush");
});
let (_req_tx, req_rx) = mpsc::channel(8);
let (state_tx, mut state_rx) = watch::channel(RigState::new_uninitialized());
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let client = tokio::spawn(super::run_remote_client(
RemoteClientConfig {
addr: addr.to_string(),
token: None,
poll_interval: Duration::from_millis(100),
},
req_rx,
state_tx,
shutdown_rx,
));
tokio::time::timeout(Duration::from_secs(5), async {
loop {
if state_rx.borrow().initialized {
break;
}
state_rx.changed().await.expect("state channel");
}
})
.await
.expect("state update timeout");
assert_eq!(state_rx.borrow().status.freq.hz, 7_100_000);
let _ = shutdown_tx.send(true);
tokio::time::timeout(Duration::from_secs(2), async {
let _ = client.await;
})
.await
.expect("client shutdown timeout");
let _ = server.await;
}
}
+2 -51
View File
@@ -2,10 +2,10 @@
//
// SPDX-License-Identifier: BSD-2-Clause
use std::collections::{HashMap, VecDeque, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, OnceLock};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use bytes::Bytes;
@@ -151,52 +151,3 @@ fn normalize_name(name: &str) -> String {
.filter(|c| c.is_ascii_alphanumeric())
.collect()
}
/// Phase 3D: Plugin compatibility adapter - delegates to bootstrap context.
fn bootstrap_context() -> &'static Arc<Mutex<FrontendRegistrationContext>> {
static BOOTSTRAP_CONTEXT: OnceLock<Arc<Mutex<FrontendRegistrationContext>>> = OnceLock::new();
BOOTSTRAP_CONTEXT.get_or_init(|| Arc::new(Mutex::new(FrontendRegistrationContext::new())))
}
/// Snapshot current plugin/bootstrap registrations into an owned context.
pub fn snapshot_bootstrap_context() -> FrontendRegistrationContext {
let ctx = bootstrap_context()
.lock()
.expect("frontend context mutex poisoned");
ctx.clone()
}
/// Register a frontend spawner under a stable name (e.g. "http").
/// Plugin compatibility: delegates to bootstrap context.
pub fn register_frontend(name: &str, spawner: FrontendSpawnFn) {
let mut ctx = bootstrap_context().lock().expect("frontend context mutex poisoned");
ctx.register_frontend(name, spawner);
}
/// Check whether a frontend name is registered.
/// Plugin compatibility: reads from bootstrap context.
pub fn is_frontend_registered(name: &str) -> bool {
let ctx = bootstrap_context().lock().expect("frontend context mutex poisoned");
ctx.is_frontend_registered(name)
}
/// List registered frontend names.
/// Plugin compatibility: reads from bootstrap context.
pub fn registered_frontends() -> Vec<String> {
let ctx = bootstrap_context().lock().expect("frontend context mutex poisoned");
ctx.registered_frontends()
}
/// Spawn a registered frontend by name with runtime context.
/// Plugin compatibility: reads from bootstrap context.
pub fn spawn_frontend(
name: &str,
state_rx: watch::Receiver<RigState>,
rig_tx: mpsc::Sender<RigRequest>,
callsign: Option<String>,
listen_addr: SocketAddr,
context: Arc<FrontendRuntimeContext>,
) -> DynResult<JoinHandle<()>> {
let ctx = bootstrap_context().lock().expect("frontend context mutex poisoned");
ctx.spawn_frontend(name, state_rx, rig_tx, callsign, listen_addr, context)
}
@@ -8,8 +8,3 @@ pub fn register_frontend_on(context: &mut trx_frontend::FrontendRegistrationCont
use trx_frontend::FrontendSpawner;
context.register_frontend("http-json", server::HttpJsonFrontend::spawn_frontend);
}
pub fn register_frontend() {
use trx_frontend::FrontendSpawner;
trx_frontend::register_frontend("http-json", server::HttpJsonFrontend::spawn_frontend);
}
@@ -266,3 +266,148 @@ fn authorize(token: &Option<String>, context: &FrontendRuntimeContext) -> Result
let validator = SimpleTokenValidator::new(context.auth_tokens.clone());
validator.validate(token)
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use std::net::Ipv4Addr;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use trx_core::radio::freq::{Band, Freq};
use trx_core::rig::state::RigSnapshot;
use trx_core::rig::{RigAccessMethod, RigCapabilities, RigInfo, RigStatus, RigTxStatus};
use trx_core::RigMode;
fn loopback_addr() -> SocketAddr {
let listener = std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).expect("bind");
let addr = listener.local_addr().expect("local_addr");
drop(listener);
addr
}
fn sample_snapshot() -> RigSnapshot {
RigSnapshot {
info: RigInfo {
manufacturer: "Test".to_string(),
model: "Dummy".to_string(),
revision: "1".to_string(),
capabilities: RigCapabilities {
supported_bands: vec![Band {
low_hz: 14_000_000,
high_hz: 14_350_000,
tx_allowed: true,
}],
supported_modes: vec![RigMode::USB],
num_vfos: 1,
lock: false,
lockable: true,
attenuator: false,
preamp: false,
rit: false,
rpt: false,
split: false,
},
access: RigAccessMethod::Tcp {
addr: "127.0.0.1:1234".to_string(),
},
},
status: RigStatus {
freq: Freq { hz: 14_074_000 },
mode: RigMode::USB,
tx_en: false,
vfo: None,
tx: Some(RigTxStatus {
power: None,
limit: None,
swr: None,
alc: None,
}),
rx: None,
lock: Some(false),
},
band: None,
enabled: Some(true),
initialized: true,
server_callsign: Some("N0CALL".to_string()),
server_version: Some("test".to_string()),
server_latitude: None,
server_longitude: None,
aprs_decode_enabled: false,
cw_decode_enabled: false,
ft8_decode_enabled: false,
cw_auto: true,
cw_wpm: 15,
cw_tone_hz: 700,
}
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn rejects_missing_token() {
let addr = loopback_addr();
let (rig_tx, _rig_rx) = mpsc::channel::<RigRequest>(8);
let mut runtime = FrontendRuntimeContext::new();
runtime.auth_tokens = HashSet::from(["secret".to_string()]);
let ctx = Arc::new(runtime);
let handle = tokio::spawn(serve(addr, rig_tx, ctx));
let stream = TcpStream::connect(addr).await.expect("connect");
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
writer
.write_all(br#"{"cmd":"get_state"}"#)
.await
.expect("write");
writer.write_all(b"\n").await.expect("newline");
writer.flush().await.expect("flush");
let mut line = String::new();
reader.read_line(&mut line).await.expect("read");
let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json");
assert!(!resp.success);
assert_eq!(resp.error.as_deref(), Some("missing authorization token"));
handle.abort();
let _ = handle.await;
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn forwards_command_and_returns_snapshot() {
let addr = loopback_addr();
let (rig_tx, mut rig_rx) = mpsc::channel::<RigRequest>(8);
let ctx = Arc::new(FrontendRuntimeContext::new());
let rig_worker = tokio::spawn(async move {
if let Some(req) = rig_rx.recv().await {
let _ = req.respond_to.send(Ok(sample_snapshot()));
}
});
let handle = tokio::spawn(serve(addr, rig_tx, ctx));
let stream = TcpStream::connect(addr).await.expect("connect");
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
writer
.write_all(br#"{"cmd":"get_state"}"#)
.await
.expect("write");
writer.write_all(b"\n").await.expect("newline");
writer.flush().await.expect("flush");
let mut line = String::new();
reader.read_line(&mut line).await.expect("read");
let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json");
assert!(resp.success);
assert_eq!(resp.state.expect("snapshot").status.freq.hz, 14_074_000);
let _ = rig_worker.await;
handle.abort();
let _ = handle.await;
}
}
@@ -13,11 +13,11 @@ use tokio::sync::{broadcast, mpsc, oneshot, watch};
use tokio::time::{self, Duration};
use tokio_stream::wrappers::{IntervalStream, WatchStream};
use trx_frontend::FrontendRuntimeContext;
use trx_protocol::{ClientResponse, parse_mode};
use trx_core::radio::freq::Freq;
use trx_core::rig::{RigAccessMethod, RigCapabilities, RigInfo};
use trx_core::{RigCommand, RigRequest, RigSnapshot, RigState};
use trx_frontend::FrontendRuntimeContext;
use trx_protocol::{parse_mode, ClientResponse};
use crate::server::status;
@@ -450,28 +450,40 @@ async fn style_css() -> impl Responder {
#[get("/app.js")]
async fn app_js() -> impl Responder {
HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "application/javascript; charset=utf-8"))
.insert_header((
header::CONTENT_TYPE,
"application/javascript; charset=utf-8",
))
.body(status::APP_JS)
}
#[get("/aprs.js")]
async fn aprs_js() -> impl Responder {
HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "application/javascript; charset=utf-8"))
.insert_header((
header::CONTENT_TYPE,
"application/javascript; charset=utf-8",
))
.body(status::APRS_JS)
}
#[get("/ft8.js")]
async fn ft8_js() -> impl Responder {
HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "application/javascript; charset=utf-8"))
.insert_header((
header::CONTENT_TYPE,
"application/javascript; charset=utf-8",
))
.body(status::FT8_JS)
}
#[get("/cw.js")]
async fn cw_js() -> impl Responder {
HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, "application/javascript; charset=utf-8"))
.insert_header((
header::CONTENT_TYPE,
"application/javascript; charset=utf-8",
))
.body(status::CW_JS)
}
@@ -10,11 +10,11 @@
//! - Browser sends binary messages: raw Opus packets (TX)
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use actix_web::{Error, HttpRequest, HttpResponse, get, web};
use actix_web::{get, web, Error, HttpRequest, HttpResponse};
use actix_ws::Message;
use bytes::Bytes;
use tokio::sync::broadcast;
@@ -62,7 +62,10 @@ fn record_aprs(context: &FrontendRuntimeContext, pkt: AprsPacket) {
}
fn record_cw(context: &FrontendRuntimeContext, event: CwEvent) {
let mut history = context.cw_history.lock().expect("cw history mutex poisoned");
let mut history = context
.cw_history
.lock()
.expect("cw history mutex poisoned");
history.push_back((Instant::now(), event));
prune_cw_history(&mut history);
}
@@ -86,7 +89,10 @@ pub fn snapshot_aprs_history(context: &FrontendRuntimeContext) -> Vec<AprsPacket
}
pub fn snapshot_cw_history(context: &FrontendRuntimeContext) -> Vec<CwEvent> {
let mut history = context.cw_history.lock().expect("cw history mutex poisoned");
let mut history = context
.cw_history
.lock()
.expect("cw history mutex poisoned");
prune_cw_history(&mut history);
history.iter().map(|(_, evt)| evt.clone()).collect()
}
@@ -109,7 +115,10 @@ pub fn clear_aprs_history(context: &FrontendRuntimeContext) {
}
pub fn clear_cw_history(context: &FrontendRuntimeContext) {
let mut history = context.cw_history.lock().expect("cw history mutex poisoned");
let mut history = context
.cw_history
.lock()
.expect("cw history mutex poisoned");
history.clear();
}
@@ -128,7 +137,10 @@ pub fn subscribe_decode(
}
pub fn start_decode_history_collector(context: Arc<FrontendRuntimeContext>) {
if context.decode_collector_started.swap(true, Ordering::AcqRel) {
if context
.decode_collector_started
.swap(true, Ordering::AcqRel)
{
return;
}
@@ -8,8 +8,3 @@ pub fn register_frontend_on(context: &mut trx_frontend::FrontendRegistrationCont
use trx_frontend::FrontendSpawner;
context.register_frontend("http", server::HttpFrontend::spawn_frontend);
}
pub fn register_frontend() {
use trx_frontend::FrontendSpawner;
trx_frontend::register_frontend("http", server::HttpFrontend::spawn_frontend);
}
@@ -22,7 +22,7 @@ use tracing::{error, info};
use trx_core::RigRequest;
use trx_core::RigState;
use trx_frontend::{FrontendSpawner, FrontendRuntimeContext};
use trx_frontend::{FrontendRuntimeContext, FrontendSpawner};
/// HTTP frontend implementation.
pub struct HttpFrontend;
@@ -8,8 +8,3 @@ pub fn register_frontend_on(context: &mut trx_frontend::FrontendRegistrationCont
use trx_frontend::FrontendSpawner;
context.register_frontend("rigctl", server::RigctlFrontend::spawn_frontend);
}
pub fn register_frontend() {
use trx_frontend::FrontendSpawner;
trx_frontend::register_frontend("rigctl", server::RigctlFrontend::spawn_frontend);
}