diff --git a/Cargo.lock b/Cargo.lock index f9fdb87..04465b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2036,6 +2036,7 @@ dependencies = [ "dirs", "libloading", "serde", + "serde_json", "tokio", "tokio-serial", "toml", diff --git a/src/trx-server/Cargo.toml b/src/trx-server/Cargo.toml index 36fe4fc..179a6ca 100644 --- a/src/trx-server/Cargo.toml +++ b/src/trx-server/Cargo.toml @@ -11,6 +11,7 @@ edition = "2021" tokio = { workspace = true, features = ["full"] } tokio-serial = { workspace = true } serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } toml = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/src/trx-server/src/config.rs b/src/trx-server/src/config.rs index fc14a55..f5dd3be 100644 --- a/src/trx-server/src/config.rs +++ b/src/trx-server/src/config.rs @@ -10,6 +10,7 @@ //! 3. `~/.config/trx-rs/server.toml` (XDG config) //! 4. `/etc/trx-rs/server.toml` (system-wide) +use std::net::IpAddr; use std::path::{Path, PathBuf}; use serde::{Deserialize, Serialize}; @@ -26,6 +27,8 @@ pub struct ServerConfig { pub rig: RigConfig, /// Polling and retry behavior pub behavior: BehaviorConfig, + /// TCP listener configuration + pub listen: ListenConfig, } /// General application settings. @@ -105,6 +108,39 @@ impl Default for BehaviorConfig { } } +/// TCP listener configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct ListenConfig { + /// Whether the listener is enabled + pub enabled: bool, + /// IP address to listen on + pub listen: IpAddr, + /// TCP port to listen on + pub port: u16, + /// Authentication configuration + pub auth: AuthConfig, +} + +impl Default for ListenConfig { + fn default() -> Self { + Self { + enabled: true, + listen: IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), + port: 4532, + auth: AuthConfig::default(), + } + } +} + +/// Authentication configuration for the TCP listener. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default)] +pub struct AuthConfig { + /// Valid authentication tokens (empty = no auth required) + pub tokens: Vec, +} + impl ServerConfig { /// Load configuration from a specific file path. pub fn load_from_file(path: &Path) -> Result { @@ -168,6 +204,7 @@ impl ServerConfig { }, }, behavior: BehaviorConfig::default(), + listen: ListenConfig::default(), }; toml::to_string_pretty(&example).unwrap_or_default() @@ -219,6 +256,9 @@ mod tests { assert_eq!(config.rig.initial_mode, RigMode::USB); assert_eq!(config.behavior.poll_interval_ms, 500); assert_eq!(config.behavior.max_retries, 3); + assert!(config.listen.enabled); + assert_eq!(config.listen.port, 4532); + assert!(config.listen.auth.tokens.is_empty()); } #[test] @@ -261,6 +301,14 @@ poll_interval_ms = 1000 poll_interval_tx_ms = 200 max_retries = 5 retry_base_delay_ms = 50 + +[listen] +enabled = true +listen = "0.0.0.0" +port = 5000 + +[listen.auth] +tokens = ["secret123"] "#; let config: ServerConfig = toml::from_str(toml_str).unwrap(); @@ -270,6 +318,13 @@ retry_base_delay_ms = 50 assert_eq!(config.rig.initial_mode, RigMode::LSB); assert_eq!(config.behavior.poll_interval_ms, 1000); assert_eq!(config.behavior.max_retries, 5); + assert!(config.listen.enabled); + assert_eq!( + config.listen.listen, + std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)) + ); + assert_eq!(config.listen.port, 5000); + assert_eq!(config.listen.auth.tokens, vec!["secret123".to_string()]); } #[test] diff --git a/src/trx-server/src/listener.rs b/src/trx-server/src/listener.rs new file mode 100644 index 0000000..a950085 --- /dev/null +++ b/src/trx-server/src/listener.rs @@ -0,0 +1,223 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! JSON-over-TCP listener for trx-server. +//! +//! Accepts client connections speaking the `ClientEnvelope`/`ClientResponse` +//! protocol defined in `trx-core::client`. + +use std::collections::HashSet; +use std::net::SocketAddr; + +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{mpsc, oneshot}; +use tracing::{error, info}; + +use trx_core::client::ClientEnvelope; +use trx_core::radio::freq::Freq; +use trx_core::rig::command::RigCommand; +use trx_core::rig::request::RigRequest; +use trx_core::rig::state::RigMode; +use trx_core::{ClientCommand, ClientResponse}; + +/// Run the JSON TCP listener, accepting client connections. +pub async fn run_listener( + addr: SocketAddr, + rig_tx: mpsc::Sender, + auth_tokens: HashSet, +) -> std::io::Result<()> { + let listener = TcpListener::bind(addr).await?; + info!("Listening on {}", addr); + + loop { + let (socket, peer) = listener.accept().await?; + info!("Client connected: {}", peer); + + let tx = rig_tx.clone(); + let tokens = auth_tokens.clone(); + tokio::spawn(async move { + if let Err(e) = handle_client(socket, peer, tx, &tokens).await { + error!("Client {} error: {:?}", peer, e); + } + }); + } +} + +async fn handle_client( + socket: TcpStream, + addr: SocketAddr, + tx: mpsc::Sender, + auth_tokens: &HashSet, +) -> std::io::Result<()> { + let (reader, mut writer) = socket.into_split(); + let mut reader = BufReader::new(reader); + let mut line = String::new(); + + loop { + line.clear(); + let bytes_read = reader.read_line(&mut line).await?; + if bytes_read == 0 { + info!("Client {} disconnected", addr); + break; + } + + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let envelope = match parse_envelope(trimmed) { + Ok(envelope) => envelope, + Err(e) => { + error!("Invalid JSON from {}: {} / {:?}", addr, trimmed, e); + let resp = ClientResponse { + success: false, + state: None, + error: Some(format!("Invalid JSON: {}", e)), + }; + let resp_line = serde_json::to_string(&resp)? + "\n"; + writer.write_all(resp_line.as_bytes()).await?; + writer.flush().await?; + continue; + } + }; + + if let Err(err) = authorize(&envelope.token, auth_tokens) { + let resp = ClientResponse { + success: false, + state: None, + error: Some(err), + }; + let resp_line = serde_json::to_string(&resp)? + "\n"; + writer.write_all(resp_line.as_bytes()).await?; + writer.flush().await?; + continue; + } + + let rig_cmd = map_command(envelope.cmd); + + let (resp_tx, resp_rx) = oneshot::channel(); + let req = RigRequest { + cmd: rig_cmd, + respond_to: resp_tx, + }; + + if let Err(e) = tx.send(req).await { + error!("Failed to send request to rig_task: {:?}", e); + let resp = ClientResponse { + success: false, + state: None, + error: Some("Internal error: rig task not available".into()), + }; + let resp_line = serde_json::to_string(&resp)? + "\n"; + writer.write_all(resp_line.as_bytes()).await?; + writer.flush().await?; + continue; + } + + match resp_rx.await { + Ok(Ok(snapshot)) => { + let resp = ClientResponse { + success: true, + state: Some(snapshot), + error: None, + }; + let resp_line = serde_json::to_string(&resp)? + "\n"; + writer.write_all(resp_line.as_bytes()).await?; + writer.flush().await?; + } + Ok(Err(err)) => { + let resp = ClientResponse { + success: false, + state: None, + error: Some(err.message), + }; + let resp_line = serde_json::to_string(&resp)? + "\n"; + writer.write_all(resp_line.as_bytes()).await?; + writer.flush().await?; + } + Err(e) => { + error!("Rig response oneshot recv error: {:?}", e); + let resp = ClientResponse { + success: false, + state: None, + error: Some("Internal error waiting for rig response".into()), + }; + let resp_line = serde_json::to_string(&resp)? + "\n"; + writer.write_all(resp_line.as_bytes()).await?; + writer.flush().await?; + } + } + } + + Ok(()) +} + +fn map_command(cmd: ClientCommand) -> RigCommand { + match cmd { + ClientCommand::GetState => RigCommand::GetSnapshot, + ClientCommand::SetFreq { freq_hz } => RigCommand::SetFreq(Freq { hz: freq_hz }), + ClientCommand::SetMode { mode } => RigCommand::SetMode(parse_mode(&mode)), + ClientCommand::SetPtt { ptt } => RigCommand::SetPtt(ptt), + ClientCommand::PowerOn => RigCommand::PowerOn, + ClientCommand::PowerOff => RigCommand::PowerOff, + ClientCommand::ToggleVfo => RigCommand::ToggleVfo, + ClientCommand::Lock => RigCommand::Lock, + ClientCommand::Unlock => RigCommand::Unlock, + ClientCommand::GetTxLimit => RigCommand::GetTxLimit, + ClientCommand::SetTxLimit { limit } => RigCommand::SetTxLimit(limit), + } +} + +fn parse_mode(s: &str) -> RigMode { + match s.to_uppercase().as_str() { + "LSB" => RigMode::LSB, + "USB" => RigMode::USB, + "CW" => RigMode::CW, + "CWR" => RigMode::CWR, + "AM" => RigMode::AM, + "FM" => RigMode::FM, + "DIG" | "DIGI" => RigMode::DIG, + "PKT" | "PACKET" => RigMode::PKT, + other => RigMode::Other(other.to_string()), + } +} + +fn parse_envelope(input: &str) -> Result { + match serde_json::from_str::(input) { + Ok(envelope) => Ok(envelope), + Err(_) => { + let cmd = serde_json::from_str::(input)?; + Ok(ClientEnvelope { token: None, cmd }) + } + } +} + +fn authorize(token: &Option, valid_tokens: &HashSet) -> Result<(), String> { + if valid_tokens.is_empty() { + return Ok(()); + } + + let Some(token) = token.as_ref() else { + return Err("missing authorization token".into()); + }; + + let candidate = strip_bearer(token); + if valid_tokens.contains(candidate) { + return Ok(()); + } + + Err("invalid authorization token".into()) +} + +fn strip_bearer(value: &str) -> &str { + let trimmed = value.trim(); + let prefix = "bearer "; + if trimmed.len() >= prefix.len() && trimmed[..prefix.len()].eq_ignore_ascii_case(prefix) { + &trimmed[prefix.len()..] + } else { + trimmed + } +} diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index 90b5271..8b1360a 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -4,16 +4,19 @@ mod config; mod error; +mod listener; mod plugins; mod rig_task; +use std::collections::HashSet; +use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::time::Duration; use clap::{Parser, ValueEnum}; use tokio::signal; use tokio::sync::{mpsc, watch}; -use tracing::info; +use tracing::{error, info}; use trx_backend::{is_backend_registered, register_builtin_backends, registered_backends, RigAccess}; use trx_core::radio::freq::Freq; @@ -56,6 +59,12 @@ struct Cli { /// Optional callsign/owner label #[arg(short = 'c', long = "callsign")] callsign: Option, + /// IP address for the JSON TCP listener + #[arg(short = 'l', long = "listen")] + listen: Option, + /// Port for the JSON TCP listener + #[arg(short = 'p', long = "port")] + port: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] @@ -271,11 +280,32 @@ async fn main() -> DynResult<()> { let (state_tx, state_rx) = watch::channel(initial_state); // Keep receivers alive so channels don't close prematurely let _state_rx = state_rx; - let _tx = tx; let rig_task_config = build_rig_task_config(&resolved, &cfg); let _rig_handle = tokio::spawn(rig_task::run_rig_task(rig_task_config, rx, state_tx)); + if cfg.listen.enabled { + let listen_ip = cli.listen.unwrap_or(cfg.listen.listen); + let listen_port = cli.port.unwrap_or(cfg.listen.port); + let listen_addr = SocketAddr::from((listen_ip, listen_port)); + let auth_tokens: HashSet = cfg + .listen + .auth + .tokens + .iter() + .filter(|t| !t.is_empty()) + .cloned() + .collect(); + let rig_tx = tx.clone(); + tokio::spawn(async move { + if let Err(e) = listener::run_listener(listen_addr, rig_tx, auth_tokens).await { + error!("Listener error: {:?}", e); + } + }); + } + + let _tx = tx; + signal::ctrl_c().await?; info!("Ctrl+C received, shutting down"); Ok(()) diff --git a/trx-server.toml.example b/trx-server.toml.example index 0a67fe2..5b6e077 100644 --- a/trx-server.toml.example +++ b/trx-server.toml.example @@ -48,3 +48,17 @@ max_retries = 3 # Base delay for exponential backoff (milliseconds) retry_base_delay_ms = 100 + +[listen] +# Enable the JSON TCP listener for client connections +enabled = true + +# IP address to listen on (use "0.0.0.0" for all interfaces) +listen = "127.0.0.1" + +# TCP port to listen on +port = 4532 + +[listen.auth] +# Authentication tokens (empty = no auth required) +tokens = []