[feat](trx-server): add JSON TCP listener for client connections
Add a JSON-over-TCP listener so trx-client can connect to trx-server. Speaks the ClientEnvelope/ClientResponse protocol from trx-core::client. - New listener.rs module with per-client connection handling - ListenConfig/AuthConfig in config.rs (default: 127.0.0.1:4532) - CLI args --listen and --port for override - Optional token-based authentication - Updated example config with [listen] section Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
Generated
+1
@@ -2036,6 +2036,7 @@ dependencies = [
|
|||||||
"dirs",
|
"dirs",
|
||||||
"libloading",
|
"libloading",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-serial",
|
"tokio-serial",
|
||||||
"toml",
|
"toml",
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ edition = "2021"
|
|||||||
tokio = { workspace = true, features = ["full"] }
|
tokio = { workspace = true, features = ["full"] }
|
||||||
tokio-serial = { workspace = true }
|
tokio-serial = { workspace = true }
|
||||||
serde = { workspace = true, features = ["derive"] }
|
serde = { workspace = true, features = ["derive"] }
|
||||||
|
serde_json = { workspace = true }
|
||||||
toml = { workspace = true }
|
toml = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
tracing-subscriber = { workspace = true }
|
tracing-subscriber = { workspace = true }
|
||||||
|
|||||||
@@ -10,6 +10,7 @@
|
|||||||
//! 3. `~/.config/trx-rs/server.toml` (XDG config)
|
//! 3. `~/.config/trx-rs/server.toml` (XDG config)
|
||||||
//! 4. `/etc/trx-rs/server.toml` (system-wide)
|
//! 4. `/etc/trx-rs/server.toml` (system-wide)
|
||||||
|
|
||||||
|
use std::net::IpAddr;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@@ -26,6 +27,8 @@ pub struct ServerConfig {
|
|||||||
pub rig: RigConfig,
|
pub rig: RigConfig,
|
||||||
/// Polling and retry behavior
|
/// Polling and retry behavior
|
||||||
pub behavior: BehaviorConfig,
|
pub behavior: BehaviorConfig,
|
||||||
|
/// TCP listener configuration
|
||||||
|
pub listen: ListenConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// General application settings.
|
/// General application settings.
|
||||||
@@ -105,6 +108,39 @@ impl Default for BehaviorConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// TCP listener configuration.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub struct ListenConfig {
|
||||||
|
/// Whether the listener is enabled
|
||||||
|
pub enabled: bool,
|
||||||
|
/// IP address to listen on
|
||||||
|
pub listen: IpAddr,
|
||||||
|
/// TCP port to listen on
|
||||||
|
pub port: u16,
|
||||||
|
/// Authentication configuration
|
||||||
|
pub auth: AuthConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ListenConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
enabled: true,
|
||||||
|
listen: IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
|
||||||
|
port: 4532,
|
||||||
|
auth: AuthConfig::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Authentication configuration for the TCP listener.
|
||||||
|
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub struct AuthConfig {
|
||||||
|
/// Valid authentication tokens (empty = no auth required)
|
||||||
|
pub tokens: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
impl ServerConfig {
|
impl ServerConfig {
|
||||||
/// Load configuration from a specific file path.
|
/// Load configuration from a specific file path.
|
||||||
pub fn load_from_file(path: &Path) -> Result<Self, ConfigError> {
|
pub fn load_from_file(path: &Path) -> Result<Self, ConfigError> {
|
||||||
@@ -168,6 +204,7 @@ impl ServerConfig {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
behavior: BehaviorConfig::default(),
|
behavior: BehaviorConfig::default(),
|
||||||
|
listen: ListenConfig::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
toml::to_string_pretty(&example).unwrap_or_default()
|
toml::to_string_pretty(&example).unwrap_or_default()
|
||||||
@@ -219,6 +256,9 @@ mod tests {
|
|||||||
assert_eq!(config.rig.initial_mode, RigMode::USB);
|
assert_eq!(config.rig.initial_mode, RigMode::USB);
|
||||||
assert_eq!(config.behavior.poll_interval_ms, 500);
|
assert_eq!(config.behavior.poll_interval_ms, 500);
|
||||||
assert_eq!(config.behavior.max_retries, 3);
|
assert_eq!(config.behavior.max_retries, 3);
|
||||||
|
assert!(config.listen.enabled);
|
||||||
|
assert_eq!(config.listen.port, 4532);
|
||||||
|
assert!(config.listen.auth.tokens.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -261,6 +301,14 @@ poll_interval_ms = 1000
|
|||||||
poll_interval_tx_ms = 200
|
poll_interval_tx_ms = 200
|
||||||
max_retries = 5
|
max_retries = 5
|
||||||
retry_base_delay_ms = 50
|
retry_base_delay_ms = 50
|
||||||
|
|
||||||
|
[listen]
|
||||||
|
enabled = true
|
||||||
|
listen = "0.0.0.0"
|
||||||
|
port = 5000
|
||||||
|
|
||||||
|
[listen.auth]
|
||||||
|
tokens = ["secret123"]
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
let config: ServerConfig = toml::from_str(toml_str).unwrap();
|
let config: ServerConfig = toml::from_str(toml_str).unwrap();
|
||||||
@@ -270,6 +318,13 @@ retry_base_delay_ms = 50
|
|||||||
assert_eq!(config.rig.initial_mode, RigMode::LSB);
|
assert_eq!(config.rig.initial_mode, RigMode::LSB);
|
||||||
assert_eq!(config.behavior.poll_interval_ms, 1000);
|
assert_eq!(config.behavior.poll_interval_ms, 1000);
|
||||||
assert_eq!(config.behavior.max_retries, 5);
|
assert_eq!(config.behavior.max_retries, 5);
|
||||||
|
assert!(config.listen.enabled);
|
||||||
|
assert_eq!(
|
||||||
|
config.listen.listen,
|
||||||
|
std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0))
|
||||||
|
);
|
||||||
|
assert_eq!(config.listen.port, 5000);
|
||||||
|
assert_eq!(config.listen.auth.tokens, vec!["secret123".to_string()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -0,0 +1,223 @@
|
|||||||
|
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: BSD-2-Clause
|
||||||
|
|
||||||
|
//! JSON-over-TCP listener for trx-server.
|
||||||
|
//!
|
||||||
|
//! Accepts client connections speaking the `ClientEnvelope`/`ClientResponse`
|
||||||
|
//! protocol defined in `trx-core::client`.
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
use tracing::{error, info};
|
||||||
|
|
||||||
|
use trx_core::client::ClientEnvelope;
|
||||||
|
use trx_core::radio::freq::Freq;
|
||||||
|
use trx_core::rig::command::RigCommand;
|
||||||
|
use trx_core::rig::request::RigRequest;
|
||||||
|
use trx_core::rig::state::RigMode;
|
||||||
|
use trx_core::{ClientCommand, ClientResponse};
|
||||||
|
|
||||||
|
/// Run the JSON TCP listener, accepting client connections.
|
||||||
|
pub async fn run_listener(
|
||||||
|
addr: SocketAddr,
|
||||||
|
rig_tx: mpsc::Sender<RigRequest>,
|
||||||
|
auth_tokens: HashSet<String>,
|
||||||
|
) -> std::io::Result<()> {
|
||||||
|
let listener = TcpListener::bind(addr).await?;
|
||||||
|
info!("Listening on {}", addr);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (socket, peer) = listener.accept().await?;
|
||||||
|
info!("Client connected: {}", peer);
|
||||||
|
|
||||||
|
let tx = rig_tx.clone();
|
||||||
|
let tokens = auth_tokens.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = handle_client(socket, peer, tx, &tokens).await {
|
||||||
|
error!("Client {} error: {:?}", peer, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_client(
|
||||||
|
socket: TcpStream,
|
||||||
|
addr: SocketAddr,
|
||||||
|
tx: mpsc::Sender<RigRequest>,
|
||||||
|
auth_tokens: &HashSet<String>,
|
||||||
|
) -> std::io::Result<()> {
|
||||||
|
let (reader, mut writer) = socket.into_split();
|
||||||
|
let mut reader = BufReader::new(reader);
|
||||||
|
let mut line = String::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
line.clear();
|
||||||
|
let bytes_read = reader.read_line(&mut line).await?;
|
||||||
|
if bytes_read == 0 {
|
||||||
|
info!("Client {} disconnected", addr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let trimmed = line.trim();
|
||||||
|
if trimmed.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let envelope = match parse_envelope(trimmed) {
|
||||||
|
Ok(envelope) => envelope,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Invalid JSON from {}: {} / {:?}", addr, trimmed, e);
|
||||||
|
let resp = ClientResponse {
|
||||||
|
success: false,
|
||||||
|
state: None,
|
||||||
|
error: Some(format!("Invalid JSON: {}", e)),
|
||||||
|
};
|
||||||
|
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||||
|
writer.write_all(resp_line.as_bytes()).await?;
|
||||||
|
writer.flush().await?;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(err) = authorize(&envelope.token, auth_tokens) {
|
||||||
|
let resp = ClientResponse {
|
||||||
|
success: false,
|
||||||
|
state: None,
|
||||||
|
error: Some(err),
|
||||||
|
};
|
||||||
|
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||||
|
writer.write_all(resp_line.as_bytes()).await?;
|
||||||
|
writer.flush().await?;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let rig_cmd = map_command(envelope.cmd);
|
||||||
|
|
||||||
|
let (resp_tx, resp_rx) = oneshot::channel();
|
||||||
|
let req = RigRequest {
|
||||||
|
cmd: rig_cmd,
|
||||||
|
respond_to: resp_tx,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = tx.send(req).await {
|
||||||
|
error!("Failed to send request to rig_task: {:?}", e);
|
||||||
|
let resp = ClientResponse {
|
||||||
|
success: false,
|
||||||
|
state: None,
|
||||||
|
error: Some("Internal error: rig task not available".into()),
|
||||||
|
};
|
||||||
|
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||||
|
writer.write_all(resp_line.as_bytes()).await?;
|
||||||
|
writer.flush().await?;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match resp_rx.await {
|
||||||
|
Ok(Ok(snapshot)) => {
|
||||||
|
let resp = ClientResponse {
|
||||||
|
success: true,
|
||||||
|
state: Some(snapshot),
|
||||||
|
error: None,
|
||||||
|
};
|
||||||
|
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||||
|
writer.write_all(resp_line.as_bytes()).await?;
|
||||||
|
writer.flush().await?;
|
||||||
|
}
|
||||||
|
Ok(Err(err)) => {
|
||||||
|
let resp = ClientResponse {
|
||||||
|
success: false,
|
||||||
|
state: None,
|
||||||
|
error: Some(err.message),
|
||||||
|
};
|
||||||
|
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||||
|
writer.write_all(resp_line.as_bytes()).await?;
|
||||||
|
writer.flush().await?;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Rig response oneshot recv error: {:?}", e);
|
||||||
|
let resp = ClientResponse {
|
||||||
|
success: false,
|
||||||
|
state: None,
|
||||||
|
error: Some("Internal error waiting for rig response".into()),
|
||||||
|
};
|
||||||
|
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||||
|
writer.write_all(resp_line.as_bytes()).await?;
|
||||||
|
writer.flush().await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn map_command(cmd: ClientCommand) -> RigCommand {
|
||||||
|
match cmd {
|
||||||
|
ClientCommand::GetState => RigCommand::GetSnapshot,
|
||||||
|
ClientCommand::SetFreq { freq_hz } => RigCommand::SetFreq(Freq { hz: freq_hz }),
|
||||||
|
ClientCommand::SetMode { mode } => RigCommand::SetMode(parse_mode(&mode)),
|
||||||
|
ClientCommand::SetPtt { ptt } => RigCommand::SetPtt(ptt),
|
||||||
|
ClientCommand::PowerOn => RigCommand::PowerOn,
|
||||||
|
ClientCommand::PowerOff => RigCommand::PowerOff,
|
||||||
|
ClientCommand::ToggleVfo => RigCommand::ToggleVfo,
|
||||||
|
ClientCommand::Lock => RigCommand::Lock,
|
||||||
|
ClientCommand::Unlock => RigCommand::Unlock,
|
||||||
|
ClientCommand::GetTxLimit => RigCommand::GetTxLimit,
|
||||||
|
ClientCommand::SetTxLimit { limit } => RigCommand::SetTxLimit(limit),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_mode(s: &str) -> RigMode {
|
||||||
|
match s.to_uppercase().as_str() {
|
||||||
|
"LSB" => RigMode::LSB,
|
||||||
|
"USB" => RigMode::USB,
|
||||||
|
"CW" => RigMode::CW,
|
||||||
|
"CWR" => RigMode::CWR,
|
||||||
|
"AM" => RigMode::AM,
|
||||||
|
"FM" => RigMode::FM,
|
||||||
|
"DIG" | "DIGI" => RigMode::DIG,
|
||||||
|
"PKT" | "PACKET" => RigMode::PKT,
|
||||||
|
other => RigMode::Other(other.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_envelope(input: &str) -> Result<ClientEnvelope, serde_json::Error> {
|
||||||
|
match serde_json::from_str::<ClientEnvelope>(input) {
|
||||||
|
Ok(envelope) => Ok(envelope),
|
||||||
|
Err(_) => {
|
||||||
|
let cmd = serde_json::from_str::<ClientCommand>(input)?;
|
||||||
|
Ok(ClientEnvelope { token: None, cmd })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn authorize(token: &Option<String>, valid_tokens: &HashSet<String>) -> Result<(), String> {
|
||||||
|
if valid_tokens.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(token) = token.as_ref() else {
|
||||||
|
return Err("missing authorization token".into());
|
||||||
|
};
|
||||||
|
|
||||||
|
let candidate = strip_bearer(token);
|
||||||
|
if valid_tokens.contains(candidate) {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
Err("invalid authorization token".into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn strip_bearer(value: &str) -> &str {
|
||||||
|
let trimmed = value.trim();
|
||||||
|
let prefix = "bearer ";
|
||||||
|
if trimmed.len() >= prefix.len() && trimmed[..prefix.len()].eq_ignore_ascii_case(prefix) {
|
||||||
|
&trimmed[prefix.len()..]
|
||||||
|
} else {
|
||||||
|
trimmed
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,16 +4,19 @@
|
|||||||
|
|
||||||
mod config;
|
mod config;
|
||||||
mod error;
|
mod error;
|
||||||
|
mod listener;
|
||||||
mod plugins;
|
mod plugins;
|
||||||
mod rig_task;
|
mod rig_task;
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use clap::{Parser, ValueEnum};
|
use clap::{Parser, ValueEnum};
|
||||||
use tokio::signal;
|
use tokio::signal;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
use tracing::info;
|
use tracing::{error, info};
|
||||||
|
|
||||||
use trx_backend::{is_backend_registered, register_builtin_backends, registered_backends, RigAccess};
|
use trx_backend::{is_backend_registered, register_builtin_backends, registered_backends, RigAccess};
|
||||||
use trx_core::radio::freq::Freq;
|
use trx_core::radio::freq::Freq;
|
||||||
@@ -56,6 +59,12 @@ struct Cli {
|
|||||||
/// Optional callsign/owner label
|
/// Optional callsign/owner label
|
||||||
#[arg(short = 'c', long = "callsign")]
|
#[arg(short = 'c', long = "callsign")]
|
||||||
callsign: Option<String>,
|
callsign: Option<String>,
|
||||||
|
/// IP address for the JSON TCP listener
|
||||||
|
#[arg(short = 'l', long = "listen")]
|
||||||
|
listen: Option<IpAddr>,
|
||||||
|
/// Port for the JSON TCP listener
|
||||||
|
#[arg(short = 'p', long = "port")]
|
||||||
|
port: Option<u16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
|
||||||
@@ -271,11 +280,32 @@ async fn main() -> DynResult<()> {
|
|||||||
let (state_tx, state_rx) = watch::channel(initial_state);
|
let (state_tx, state_rx) = watch::channel(initial_state);
|
||||||
// Keep receivers alive so channels don't close prematurely
|
// Keep receivers alive so channels don't close prematurely
|
||||||
let _state_rx = state_rx;
|
let _state_rx = state_rx;
|
||||||
let _tx = tx;
|
|
||||||
|
|
||||||
let rig_task_config = build_rig_task_config(&resolved, &cfg);
|
let rig_task_config = build_rig_task_config(&resolved, &cfg);
|
||||||
let _rig_handle = tokio::spawn(rig_task::run_rig_task(rig_task_config, rx, state_tx));
|
let _rig_handle = tokio::spawn(rig_task::run_rig_task(rig_task_config, rx, state_tx));
|
||||||
|
|
||||||
|
if cfg.listen.enabled {
|
||||||
|
let listen_ip = cli.listen.unwrap_or(cfg.listen.listen);
|
||||||
|
let listen_port = cli.port.unwrap_or(cfg.listen.port);
|
||||||
|
let listen_addr = SocketAddr::from((listen_ip, listen_port));
|
||||||
|
let auth_tokens: HashSet<String> = cfg
|
||||||
|
.listen
|
||||||
|
.auth
|
||||||
|
.tokens
|
||||||
|
.iter()
|
||||||
|
.filter(|t| !t.is_empty())
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
let rig_tx = tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = listener::run_listener(listen_addr, rig_tx, auth_tokens).await {
|
||||||
|
error!("Listener error: {:?}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let _tx = tx;
|
||||||
|
|
||||||
signal::ctrl_c().await?;
|
signal::ctrl_c().await?;
|
||||||
info!("Ctrl+C received, shutting down");
|
info!("Ctrl+C received, shutting down");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -48,3 +48,17 @@ max_retries = 3
|
|||||||
|
|
||||||
# Base delay for exponential backoff (milliseconds)
|
# Base delay for exponential backoff (milliseconds)
|
||||||
retry_base_delay_ms = 100
|
retry_base_delay_ms = 100
|
||||||
|
|
||||||
|
[listen]
|
||||||
|
# Enable the JSON TCP listener for client connections
|
||||||
|
enabled = true
|
||||||
|
|
||||||
|
# IP address to listen on (use "0.0.0.0" for all interfaces)
|
||||||
|
listen = "127.0.0.1"
|
||||||
|
|
||||||
|
# TCP port to listen on
|
||||||
|
port = 4532
|
||||||
|
|
||||||
|
[listen.auth]
|
||||||
|
# Authentication tokens (empty = no auth required)
|
||||||
|
tokens = []
|
||||||
|
|||||||
Reference in New Issue
Block a user