From bceb049e0e1ca973b949bbbf141dc0c0d3e3960f Mon Sep 17 00:00:00 2001 From: Stanislaw Grams Date: Sun, 18 Jan 2026 09:24:06 +0100 Subject: [PATCH] bin: add config loader and remote client mode --- src/trx-bin/src/config.rs | 450 ++++++++++++++ src/trx-bin/src/main.rs | 979 +++++++++++-------------------- src/trx-bin/src/remote_client.rs | 202 +++++++ 3 files changed, 987 insertions(+), 644 deletions(-) create mode 100644 src/trx-bin/src/config.rs create mode 100644 src/trx-bin/src/remote_client.rs diff --git a/src/trx-bin/src/config.rs b/src/trx-bin/src/config.rs new file mode 100644 index 0000000..7022a24 --- /dev/null +++ b/src/trx-bin/src/config.rs @@ -0,0 +1,450 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Configuration file support for trx-bin. +//! +//! Supports loading configuration from TOML files with the following search order: +//! 1. Path specified via `--config` CLI argument +//! 2. `./trx-rs.toml` (current directory) +//! 3. `~/.config/trx-rs/config.toml` (XDG config) +//! 4. `/etc/trx-rs/config.toml` (system-wide) +//! +//! CLI arguments override config file values. + +use std::net::IpAddr; +use std::path::{Path, PathBuf}; + +use serde::{Deserialize, Serialize}; + +use trx_core::rig::state::RigMode; + +/// Top-level configuration structure. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default)] +pub struct Config { + /// General settings + pub general: GeneralConfig, + /// Rig backend configuration + pub rig: RigConfig, + /// Frontend configurations + pub frontends: FrontendsConfig, + /// Polling and retry behavior + pub behavior: BehaviorConfig, +} + +/// General application settings. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default)] +pub struct GeneralConfig { + /// Callsign or owner label to display in frontends + pub callsign: Option, + /// Log level (trace, debug, info, warn, error) + pub log_level: Option, +} + +/// Rig backend configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct RigConfig { + /// Rig model (e.g., "ft817", "ic7300") + pub model: Option, + /// Initial frequency (Hz) for the rig state before first CAT read + pub initial_freq_hz: u64, + /// Initial mode for the rig state before first CAT read + pub initial_mode: RigMode, + /// Access method configuration + pub access: AccessConfig, +} + +/// Access method configuration for reaching the rig. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default)] +pub struct AccessConfig { + /// Access type: "serial" or "tcp" + #[serde(rename = "type")] + pub access_type: Option, + /// Serial port path (for serial access) + pub port: Option, + /// Baud rate (for serial access) + pub baud: Option, + /// Host address (for TCP access) + pub host: Option, + /// TCP port (for TCP access) + pub tcp_port: Option, +} + +impl Default for RigConfig { + fn default() -> Self { + Self { + model: None, + initial_freq_hz: 144_300_000, + initial_mode: RigMode::USB, + access: AccessConfig::default(), + } + } +} + +/// Frontend configurations. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default)] +pub struct FrontendsConfig { + /// HTTP frontend settings + pub http: HttpFrontendConfig, + /// rigctl frontend settings + pub rigctl: RigctlFrontendConfig, + /// JSON TCP frontend settings + pub http_json: HttpJsonFrontendConfig, + /// Qt/QML frontend settings + pub qt: QtFrontendConfig, +} + +/// HTTP frontend configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct HttpFrontendConfig { + /// Whether HTTP frontend is enabled + pub enabled: bool, + /// Listen address + pub listen: IpAddr, + /// Listen port + pub port: u16, +} + +impl Default for HttpFrontendConfig { + fn default() -> Self { + Self { + enabled: true, + listen: IpAddr::from([127, 0, 0, 1]), + port: 8080, + } + } +} + +/// rigctl frontend configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct RigctlFrontendConfig { + /// Whether rigctl frontend is enabled + pub enabled: bool, + /// Listen address + pub listen: IpAddr, + /// Listen port + pub port: u16, +} + +/// JSON TCP frontend configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct HttpJsonFrontendConfig { + /// Whether JSON TCP frontend is enabled + pub enabled: bool, + /// Listen address + pub listen: IpAddr, + /// Listen port (0 = ephemeral) + pub port: u16, + /// Authorization settings + pub auth: HttpJsonAuthConfig, +} + +/// Qt/QML frontend configuration. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default)] +pub struct QtFrontendConfig { + /// Whether Qt frontend is enabled + pub enabled: bool, + /// Remote connection settings + pub remote: QtRemoteConfig, +} + +/// Authorization settings for JSON TCP frontend. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default)] +pub struct HttpJsonAuthConfig { + /// Accepted bearer tokens. + pub tokens: Vec, +} + +/// Remote connection settings for Qt frontend. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default)] +pub struct QtRemoteConfig { + /// Enable remote mode (no local rig task). + pub enabled: bool, + /// Remote URL (host:port or tcp://host:port). + pub url: Option, + /// Remote auth settings. + pub auth: QtRemoteAuthConfig, +} + +/// Authentication settings for Qt remote mode. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default)] +pub struct QtRemoteAuthConfig { + /// Bearer token to send with JSON commands. + pub token: Option, +} + +impl Default for HttpJsonFrontendConfig { + fn default() -> Self { + Self { + enabled: true, + listen: IpAddr::from([127, 0, 0, 1]), + port: 0, + auth: HttpJsonAuthConfig::default(), + } + } +} + +impl Default for RigctlFrontendConfig { + fn default() -> Self { + Self { + enabled: false, + listen: IpAddr::from([127, 0, 0, 1]), + port: 4532, + } + } +} + +/// Behavior configuration for polling and retries. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct BehaviorConfig { + /// Polling interval in milliseconds when idle + pub poll_interval_ms: u64, + /// Polling interval in milliseconds when transmitting + pub poll_interval_tx_ms: u64, + /// Maximum retry attempts for transient errors + pub max_retries: u32, + /// Base delay for exponential backoff in milliseconds + pub retry_base_delay_ms: u64, +} + +impl Default for BehaviorConfig { + fn default() -> Self { + Self { + poll_interval_ms: 500, + poll_interval_tx_ms: 100, + max_retries: 3, + retry_base_delay_ms: 100, + } + } +} + +impl Config { + /// Load configuration from a specific file path. + pub fn load_from_file(path: &Path) -> Result { + let contents = std::fs::read_to_string(path) + .map_err(|e| ConfigError::ReadError(path.to_path_buf(), e.to_string()))?; + + toml::from_str(&contents) + .map_err(|e| ConfigError::ParseError(path.to_path_buf(), e.to_string())) + } + + /// Load configuration from the default search paths. + /// Returns default config if no config file is found. + pub fn load_from_default_paths() -> Result<(Self, Option), ConfigError> { + let search_paths = Self::default_search_paths(); + + for path in search_paths { + if path.exists() { + let config = Self::load_from_file(&path)?; + return Ok((config, Some(path))); + } + } + + Ok((Self::default(), None)) + } + + /// Get the default search paths for config files. + pub fn default_search_paths() -> Vec { + let mut paths = Vec::new(); + + // Current directory + paths.push(PathBuf::from("trx-rs.toml")); + + // XDG config directory + if let Some(config_dir) = dirs::config_dir() { + paths.push(config_dir.join("trx-rs").join("config.toml")); + } + + // System-wide config + paths.push(PathBuf::from("/etc/trx-rs/config.toml")); + + paths + } + + /// Generate an example configuration as a TOML string. + pub fn example_toml() -> String { + let example = Config { + general: GeneralConfig { + callsign: Some("N0CALL".to_string()), + log_level: Some("info".to_string()), + }, + rig: RigConfig { + model: Some("ft817".to_string()), + initial_freq_hz: 144_300_000, + initial_mode: RigMode::USB, + access: AccessConfig { + access_type: Some("serial".to_string()), + port: Some("/dev/ttyUSB0".to_string()), + baud: Some(9600), + host: None, + tcp_port: None, + }, + }, + frontends: FrontendsConfig { + http: HttpFrontendConfig { + enabled: true, + listen: IpAddr::from([127, 0, 0, 1]), + port: 8080, + }, + rigctl: RigctlFrontendConfig { + enabled: true, + listen: IpAddr::from([127, 0, 0, 1]), + port: 4532, + }, + http_json: HttpJsonFrontendConfig::default(), + qt: QtFrontendConfig::default(), + }, + behavior: BehaviorConfig::default(), + }; + + toml::to_string_pretty(&example).unwrap_or_default() + } +} + +/// Errors that can occur when loading configuration. +#[derive(Debug)] +pub enum ConfigError { + /// Failed to read the config file + ReadError(PathBuf, String), + /// Failed to parse the config file + ParseError(PathBuf, String), +} + +impl std::fmt::Display for ConfigError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::ReadError(path, err) => { + write!( + f, + "failed to read config file '{}': {}", + path.display(), + err + ) + } + Self::ParseError(path, err) => { + write!( + f, + "failed to parse config file '{}': {}", + path.display(), + err + ) + } + } + } +} + +impl std::error::Error for ConfigError {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let config = Config::default(); + assert!(config.frontends.http.enabled); + assert!(!config.frontends.rigctl.enabled); + assert_eq!(config.frontends.http.port, 8080); + assert_eq!(config.frontends.rigctl.port, 4532); + assert_eq!(config.rig.initial_freq_hz, 144_300_000); + assert_eq!(config.rig.initial_mode, RigMode::USB); + assert!(config.frontends.http_json.enabled); + assert_eq!(config.frontends.http_json.port, 0); + assert!(!config.frontends.qt.enabled); + assert!(!config.frontends.qt.remote.enabled); + } + + #[test] + fn test_parse_minimal_toml() { + let toml_str = r#" +[rig] +model = "ft817" + +[rig.access] +type = "serial" +port = "/dev/ttyUSB0" +baud = 9600 +"#; + + let config: Config = toml::from_str(toml_str).unwrap(); + assert_eq!(config.rig.model, Some("ft817".to_string())); + assert_eq!(config.rig.access.port, Some("/dev/ttyUSB0".to_string())); + assert_eq!(config.rig.access.baud, Some(9600)); + } + + #[test] + fn test_parse_full_toml() { + let toml_str = r#" +[general] +callsign = "W1AW" +log_level = "debug" + +[rig] +model = "ft817" +initial_freq_hz = 7100000 +initial_mode = "LSB" + +[rig.access] +type = "serial" +port = "/dev/ttyUSB0" +baud = 9600 + +[frontends.http] +enabled = true +listen = "0.0.0.0" +port = 8080 + +[frontends.rigctl] +enabled = true +listen = "127.0.0.1" +port = 4532 + +[frontends.http_json] +enabled = true +listen = "127.0.0.1" +port = 9000 +auth.tokens = ["demo-token"] + +[frontends.qt] +enabled = true +remote.enabled = true +remote.url = "127.0.0.1:9000" +remote.auth.token = "demo-token" + +[behavior] +poll_interval_ms = 1000 +poll_interval_tx_ms = 200 +max_retries = 5 +retry_base_delay_ms = 50 +"#; + + let config: Config = toml::from_str(toml_str).unwrap(); + assert_eq!(config.general.callsign, Some("W1AW".to_string())); + assert_eq!(config.general.log_level, Some("debug".to_string())); + assert_eq!(config.rig.initial_freq_hz, 7_100_000); + assert_eq!(config.rig.initial_mode, RigMode::LSB); + assert!(config.frontends.http.enabled); + assert!(config.frontends.rigctl.enabled); + assert_eq!(config.behavior.poll_interval_ms, 1000); + assert_eq!(config.behavior.max_retries, 5); + } + + #[test] + fn test_example_toml_parses() { + let example = Config::example_toml(); + let _config: Config = toml::from_str(&example).unwrap(); + } +} diff --git a/src/trx-bin/src/main.rs b/src/trx-bin/src/main.rs index 7e867ff..eb89539 100644 --- a/src/trx-bin/src/main.rs +++ b/src/trx-bin/src/main.rs @@ -2,33 +2,37 @@ // // SPDX-License-Identifier: BSD-2-Clause -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; +use std::path::PathBuf; +use std::time::Duration; use clap::{Parser, ValueEnum}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::net::{TcpListener, TcpStream}; use tokio::signal; -use tokio::sync::{mpsc, oneshot, watch}; -use tokio::time::{self, Duration, Instant}; -use tracing::{debug, error, info, warn}; +use tokio::sync::{mpsc, watch}; +use tracing::info; +mod config; mod error; +mod plugins; +mod remote_client; +mod rig_task; -use crate::error::is_invalid_bcd_error; -use trx_backend::{build_rig, RigAccess, RigKind}; +use trx_backend::{ + is_backend_registered, register_builtin_backends, registered_backends, RigAccess, +}; use trx_core::radio::freq::Freq; -use trx_core::rig::command::RigCommand; +use trx_core::rig::controller::{AdaptivePolling, ExponentialBackoff}; use trx_core::rig::request::RigRequest; -use trx_core::rig::state::{RigMode, RigSnapshot, RigState}; -use trx_core::rig::{RigCat, RigControl, RigRxStatus, RigStatus, RigTxStatus}; -use trx_core::{ClientCommand, ClientResponse, DynResult, RigError, RigResult}; -use trx_frontend::FrontendSpawner; -use trx_frontend_http::server::HttpFrontend; +use trx_core::rig::state::RigState; +use trx_core::rig::{RigControl, RigRxStatus, RigStatus, RigTxStatus}; +use trx_core::DynResult; +use trx_frontend::{is_frontend_registered, registered_frontends}; +use trx_frontend_http::register_frontend as register_http_frontend; +use trx_frontend_http_json::{register_frontend as register_http_json_frontend, set_auth_tokens}; +use trx_frontend_rigctl::register_frontend as register_rigctl_frontend; -#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] -enum FrontendKind { - Http, -} +#[cfg(feature = "qt-frontend")] +use trx_frontend_qt::register_frontend as register_qt_frontend; const PKG_DESCRIPTION: &str = concat!(env!("CARGO_PKG_NAME"), " - ", env!("CARGO_PKG_DESCRIPTION")); const PKG_LONG_ABOUT: &str = concat!( @@ -36,6 +40,9 @@ const PKG_LONG_ABOUT: &str = concat!( "\nHomepage: ", env!("CARGO_PKG_HOMEPAGE") ); +const RIG_TASK_CHANNEL_BUFFER: usize = 32; +const QT_FRONTEND_LISTEN_ADDR: ([u8; 4], u16) = ([127, 0, 0, 1], 0); +const RETRY_MAX_DELAY_SECS: u64 = 2; #[derive(Debug, Parser)] #[command( @@ -45,20 +52,44 @@ const PKG_LONG_ABOUT: &str = concat!( long_about = PKG_LONG_ABOUT )] struct Cli { + /// Path to configuration file (default: search trx-rs.toml, ~/.config/trx-rs/config.toml, /etc/trx-rs/config.toml) + #[arg(long = "config", short = 'C', value_name = "FILE")] + config: Option, + /// Print example configuration and exit + #[arg(long = "print-config")] + print_config: bool, /// Rig backend to use (e.g. ft817) - #[arg(short = 'r', long = "rig", value_enum)] - rig: RigKind, + #[arg(short = 'r', long = "rig")] + rig: Option, /// Access method to reach the rig CAT interface - #[arg(short = 'a', long = "access", value_enum, default_value_t = AccessKind::Serial)] - access: AccessKind, - /// Frontend to expose for control/status (e.g. http) - #[arg(short = 'f', long = "frontend", value_enum, default_value_t = FrontendKind::Http)] - frontend: FrontendKind, + #[arg(short = 'a', long = "access", value_enum)] + access: Option, + /// Frontend(s) to expose for control/status (e.g. http,rigctl) + #[arg(short = 'f', long = "frontend", value_delimiter = ',', num_args = 1..)] + frontends: Option>, + /// HTTP frontend listen address + #[arg(long = "http-listen")] + http_listen: Option, + /// HTTP frontend listen port + #[arg(long = "http-port")] + http_port: Option, + /// rigctl frontend listen address + #[arg(long = "rigctl-listen")] + rigctl_listen: Option, + /// rigctl frontend listen port + #[arg(long = "rigctl-port")] + rigctl_port: Option, + /// JSON TCP frontend listen address + #[arg(long = "http-json-listen")] + http_json_listen: Option, + /// JSON TCP frontend listen port + #[arg(long = "http-json-port")] + http_json_port: Option, /// Rig CAT address: /// when access is serial: ; /// when access is TCP: : #[arg(value_name = "RIG_ADDR")] - rig_addr: String, + rig_addr: Option, /// Optional callsign/owner label to show in the frontend #[arg(short = 'c', long = "callsign")] callsign: Option, @@ -70,6 +101,13 @@ enum AccessKind { Tcp, } +fn normalize_name(name: &str) -> String { + name.to_ascii_lowercase() + .chars() + .filter(|c| c.is_ascii_alphanumeric()) + .collect() +} + /// Parse a serial rig address of the form " ". fn parse_serial_addr(addr: &str) -> DynResult<(String, u32)> { let mut parts = addr.split_whitespace(); @@ -88,38 +126,238 @@ fn parse_serial_addr(addr: &str) -> DynResult<(String, u32)> { Ok((path.to_string(), baud)) } +/// Resolved configuration after merging config file and CLI arguments. +struct ResolvedConfig { + rig: String, + access: RigAccess, + frontends: Vec, + http_listen: IpAddr, + http_port: u16, + rigctl_listen: IpAddr, + rigctl_port: u16, + http_json_listen: IpAddr, + http_json_port: u16, + callsign: Option, +} + +impl ResolvedConfig { + /// Build resolved config from CLI args and config file. + fn from_cli_and_config( + cli: &Cli, + cfg: &config::Config, + qt_remote_enabled: bool, + ) -> DynResult { + // Resolve rig model: CLI > config > error + let rig_str = cli.rig.clone().or_else(|| cfg.rig.model.clone()); + let rig = match rig_str.as_deref() { + Some(name) => normalize_name(name), + None if qt_remote_enabled => "remote".to_string(), + None => { + return Err( + "Rig model not specified. Use --rig or set [rig].model in config.".into(), + ) + } + }; + if !qt_remote_enabled && !is_backend_registered(&rig) { + return Err(format!( + "Unknown rig model: {} (available: {})", + rig, + registered_backends().join(", ") + ) + .into()); + } + + let access = if qt_remote_enabled { + RigAccess::Tcp { + addr: "remote".to_string(), + } + } else { + // Resolve access method: CLI > config > default to serial + let access_type = cli + .access + .as_ref() + .map(|a| match a { + AccessKind::Serial => "serial", + AccessKind::Tcp => "tcp", + }) + .or(cfg.rig.access.access_type.as_deref()); + + match access_type { + Some("serial") | None => { + // Try CLI rig_addr first, then config + let (path, baud) = if let Some(ref addr) = cli.rig_addr { + parse_serial_addr(addr)? + } else if let (Some(port), Some(baud)) = + (&cfg.rig.access.port, cfg.rig.access.baud) + { + (port.clone(), baud) + } else { + return Err("Serial access requires port and baud. Use ' ' argument or set [rig.access].port and .baud in config.".into()); + }; + RigAccess::Serial { path, baud } + } + Some("tcp") => { + let addr = if let Some(ref addr) = cli.rig_addr { + addr.clone() + } else if let (Some(host), Some(port)) = + (&cfg.rig.access.host, cfg.rig.access.tcp_port) + { + format!("{}:{}", host, port) + } else { + return Err("TCP access requires host:port. Use argument or set [rig.access].host and .tcp_port in config.".into()); + }; + RigAccess::Tcp { addr } + } + Some(other) => return Err(format!("Unknown access type: {}", other).into()), + } + }; + + // Resolve frontends: CLI > config > default + let frontends = if let Some(ref fes) = cli.frontends { + fes.iter().map(|f| normalize_name(f)).collect() + } else { + let mut fes = Vec::new(); + if cfg.frontends.http.enabled { + fes.push("http".to_string()); + } + if cfg.frontends.rigctl.enabled { + fes.push("rigctl".to_string()); + } + if cfg.frontends.http_json.enabled { + fes.push("httpjson".to_string()); + } + if cfg.frontends.qt.enabled { + fes.push("qt".to_string()); + } + if fes.is_empty() { + fes.push("http".to_string()); // Default + } + fes + }; + for name in &frontends { + if !is_frontend_registered(name) { + return Err(format!( + "Unknown frontend: {} (available: {})", + name, + registered_frontends().join(", ") + ) + .into()); + } + } + + // Resolve HTTP settings: CLI > config + let http_listen = cli.http_listen.unwrap_or(cfg.frontends.http.listen); + let http_port = cli.http_port.unwrap_or(cfg.frontends.http.port); + + // Resolve rigctl settings: CLI > config + let rigctl_listen = cli.rigctl_listen.unwrap_or(cfg.frontends.rigctl.listen); + let rigctl_port = cli.rigctl_port.unwrap_or(cfg.frontends.rigctl.port); + + // Resolve JSON TCP settings: CLI > config + let http_json_listen = cli + .http_json_listen + .unwrap_or(cfg.frontends.http_json.listen); + let http_json_port = cli.http_json_port.unwrap_or(cfg.frontends.http_json.port); + + // Resolve callsign: CLI > config + let callsign = cli + .callsign + .clone() + .or_else(|| cfg.general.callsign.clone()); + + Ok(Self { + rig, + access, + frontends, + http_listen, + http_port, + rigctl_listen, + rigctl_port, + http_json_listen, + http_json_port, + callsign, + }) + } +} + #[tokio::main] async fn main() -> DynResult<()> { init_tracing(); + register_builtin_backends(); + let _plugin_libs = plugins::load_plugins(); + register_http_frontend(); + register_http_json_frontend(); + #[cfg(feature = "qt-frontend")] + register_qt_frontend(); + register_rigctl_frontend(); let cli = Cli::parse(); - let access = match cli.access { - AccessKind::Serial => { - let (path, baud) = parse_serial_addr(&cli.rig_addr)?; - info!( - "Starting trxd (rig: {}, access: serial {} @ {} baud)", - cli.rig, path, baud - ); - RigAccess::Serial { path, baud } - } - AccessKind::Tcp => { - info!( - "Starting trxd (rig: {}, access: tcp {})", - cli.rig, cli.rig_addr - ); - RigAccess::Tcp { - addr: cli.rig_addr.clone(), + // Handle --print-config + if cli.print_config { + println!("{}", config::Config::example_toml()); + return Ok(()); + } + + // Load configuration file + let (cfg, config_path) = if let Some(ref path) = cli.config { + let cfg = config::Config::load_from_file(path)?; + (cfg, Some(path.clone())) + } else { + config::Config::load_from_default_paths()? + }; + + if let Some(ref path) = config_path { + info!("Loaded configuration from {}", path.display()); + } + + set_auth_tokens(cfg.frontends.http_json.auth.tokens.clone()); + + let qt_remote_enabled = cfg.frontends.qt.enabled && cfg.frontends.qt.remote.enabled; + if qt_remote_enabled + && cfg + .frontends + .qt + .remote + .url + .as_deref() + .unwrap_or("") + .is_empty() + { + return Err("Qt remote mode enabled but frontends.qt.remote.url is missing".into()); + } + + // Merge CLI and config + let resolved = ResolvedConfig::from_cli_and_config(&cli, &cfg, qt_remote_enabled)?; + + // Log startup info + if qt_remote_enabled { + info!("Starting trxd in Qt remote client mode"); + } else { + match &resolved.access { + RigAccess::Serial { path, baud } => { + info!( + "Starting trxd (rig: {}, access: serial {} @ {} baud)", + resolved.rig, path, baud + ); + } + RigAccess::Tcp { addr } => { + info!( + "Starting trxd (rig: {}, access: tcp {})", + resolved.rig, addr + ); } } - }; + } // Channel used to communicate with the rig task. - let (tx, rx) = mpsc::channel::(32); + let (tx, rx) = mpsc::channel::(RIG_TASK_CHANNEL_BUFFER); let initial_state = RigState { rig_info: None, status: RigStatus { - freq: Freq { hz: 144_300_000 }, - mode: RigMode::USB, + freq: Freq { + hz: cfg.rig.initial_freq_hz, + }, + mode: cfg.rig.initial_mode.clone(), tx_en: false, vfo: None, tx: Some(RigTxStatus { @@ -144,41 +382,62 @@ async fn main() -> DynResult<()> { }; let (state_tx, state_rx) = watch::channel(initial_state.clone()); - // Spawn the rig task. - let _rig_handle = tokio::spawn(rig_task(cli.rig, access, rx, state_tx, initial_state)); - - // Start TCP listener for clients. - let listen_addr = SocketAddr::from(([127, 0, 0, 1], 0)); - let listener = TcpListener::bind(listen_addr).await?; - let actual_addr = listener.local_addr()?; - info!("TCP listener started on {}", actual_addr); - - // Start simple HTTP status server on 127.0.0.1:8080. - let http_state_rx = state_rx.clone(); - if matches!(cli.frontend, FrontendKind::Http) { - HttpFrontend::spawn_frontend(http_state_rx, tx.clone(), cli.callsign.clone()); + if qt_remote_enabled { + let remote_addr = remote_client::parse_remote_url( + cfg.frontends.qt.remote.url.as_deref().unwrap_or_default(), + ) + .map_err(|e| format!("Invalid Qt remote URL: {}", e))?; + let remote_cfg = remote_client::RemoteClientConfig { + addr: remote_addr, + token: cfg.frontends.qt.remote.auth.token.clone(), + poll_interval: Duration::from_millis(750), + }; + let _remote_handle = + tokio::spawn(remote_client::run_remote_client(remote_cfg, rx, state_tx)); + } else { + // Spawn the rig task (controller-based implementation). + let rig_task_config = rig_task::RigTaskConfig { + rig_model: resolved.rig, + access: resolved.access, + polling: AdaptivePolling::new( + Duration::from_millis(cfg.behavior.poll_interval_ms), + Duration::from_millis(cfg.behavior.poll_interval_tx_ms), + ), + retry: ExponentialBackoff::new( + cfg.behavior.max_retries.max(1), + Duration::from_millis(cfg.behavior.retry_base_delay_ms), + Duration::from_secs(RETRY_MAX_DELAY_SECS), + ), + initial_freq_hz: cfg.rig.initial_freq_hz, + initial_mode: cfg.rig.initial_mode.clone(), + }; + let _rig_handle = tokio::spawn(rig_task::run_rig_task(rig_task_config, rx, state_tx)); } - loop { - tokio::select! { - res = listener.accept() => { - let (socket, addr) = res?; - info!("New client connected: {}", addr); - - let tx_clone = tx.clone(); - tokio::spawn(async move { - if let Err(e) = handle_client(socket, addr, tx_clone).await { - error!("Client {} error: {:?}", addr, e); - } - }); + // Start frontends. + for frontend in &resolved.frontends { + let frontend_state_rx = state_rx.clone(); + let addr = match frontend.as_str() { + "http" => SocketAddr::from((resolved.http_listen, resolved.http_port)), + "rigctl" => SocketAddr::from((resolved.rigctl_listen, resolved.rigctl_port)), + "httpjson" => SocketAddr::from((resolved.http_json_listen, resolved.http_json_port)), + "qt" => SocketAddr::from(QT_FRONTEND_LISTEN_ADDR), + other => { + return Err(format!("Frontend missing listen configuration: {}", other).into()); } - _ = signal::ctrl_c() => { - info!("Ctrl+C received, shutting down"); - break; - } - } + }; + trx_frontend::spawn_frontend( + frontend, + frontend_state_rx, + tx.clone(), + resolved.callsign.clone(), + addr, + )?; } + signal::ctrl_c().await?; + info!("Ctrl+C received, shutting down"); + Ok(()) } @@ -187,571 +446,3 @@ fn init_tracing() { // Uses default formatting and RUST_LOG if available. tracing_subscriber::fmt().with_target(false).init(); } - -/// Task that owns the TRX state and talks to the serial port. -async fn rig_task( - rig_kind: RigKind, - access: RigAccess, - mut rx: mpsc::Receiver, - state_tx: watch::Sender, - mut state: RigState, -) -> DynResult<()> { - info!("Opening rig backend {}", rig_kind); - match &access { - RigAccess::Serial { path, baud } => info!("Serial: {} @ {} baud", path, baud), - RigAccess::Tcp { addr } => info!("TCP CAT: {}", addr), - } - - let mut rig: Box = build_rig(rig_kind, access)?; - info!("Rig backend ready"); - - let mut poll = time::interval(Duration::from_millis(250)); - let mut poll_pause_until: Option = None; - let mut last_power_on: Option = None; - - // Initial bring-up and VFO priming. - let rig_info = rig.info().clone(); - state.rig_info = Some(rig_info); - if let Some(info) = state.rig_info.as_ref() { - info!( - "Rig info: {} {} {}", - info.manufacturer, info.model, info.revision - ); - } - let _ = state_tx.send(state.clone()); - if !state.control.enabled.unwrap_or(false) { - info!("Sending initial PowerOn to wake rig"); - match rig.power_on().await { - Ok(()) => { - state.control.enabled = Some(true); - time::sleep(Duration::from_secs(3)).await; - if let Err(e) = refresh_state_with_retry(&mut rig, &mut state, 2).await { - warn!( - "Initial PowerOn refresh failed: {:?}; retrying once after short delay", - e - ); - time::sleep(Duration::from_millis(500)).await; - if let Err(e2) = refresh_state_with_retry(&mut rig, &mut state, 1).await { - warn!( - "Initial PowerOn second refresh failed (continuing): {:?}", - e2 - ); - } - } - info!("Rig initialized after power on sequence"); - } - Err(e) => warn!("Initial PowerOn failed (continuing): {:?}", e), - } - } - if let Err(e) = prime_vfo_state(&mut rig, &mut state).await { - warn!("VFO priming failed: {:?}", e); - } - state.initialized = true; - let _ = state_tx.send(state.clone()); - - // Single-task loop: handle commands and periodic polling. - loop { - tokio::select! { - _ = poll.tick() => { - if let Some(until) = poll_pause_until { - if Instant::now() < until { - continue; - } else { - poll_pause_until = None; - } - } - if matches!(state.control.enabled, Some(false)) { - continue; - } - match refresh_state_with_retry(&mut rig, &mut state, 2).await { - Ok(()) => { let _ = state_tx.send(state.clone()); } - Err(e) => { - error!("CAT polling error: {:?}", e); - if let Some(last_on) = last_power_on { - if Instant::now().duration_since(last_on) < Duration::from_secs(5) { - poll_pause_until = Some(Instant::now() + Duration::from_millis(800)); - continue; - } - } - } - } - }, - maybe_req = rx.recv() => { - let Some(first_req) = maybe_req else { break; }; - let mut batch = vec![first_req]; - while let Ok(next) = rx.try_recv() { - batch.push(next); - } - while let Some(RigRequest { cmd, respond_to }) = batch.pop() { - let responders = vec![respond_to]; - let cmd_label = format!("{:?}", cmd); - let started = Instant::now(); - - let result: RigResult = { - let not_ready = !state.initialized - && !matches!(cmd, RigCommand::PowerOn | RigCommand::GetSnapshot); - if not_ready { - Err(RigError("rig not initialized yet".into())) - } else { - match cmd { - RigCommand::GetSnapshot => match refresh_state_with_retry(&mut rig, &mut state, 2).await { - Ok(()) => { - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - Err(e) => { - error!("Failed to read CAT status: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } - }, - RigCommand::SetFreq(freq) => { - info!("SetFreq requested: {} Hz", freq.hz); - if state.control.lock.unwrap_or(false) { - warn!("SetFreq blocked: panel lock is active"); - Err(RigError("panel is locked".into())) - } else { - let res = time::timeout(Duration::from_secs(1), rig.set_freq(freq)).await; - match res { - Ok(Ok(())) => { - state.apply_freq(freq); - poll_pause_until = Some(Instant::now() + Duration::from_millis(200)); - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - Ok(Err(e)) => { - error!("Failed to send CAT SetFreq: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } - Err(elapsed) => { - warn!("CAT SetFreq timed out ({:?}) but proceeding with state update", elapsed); - state.apply_freq(freq); - poll_pause_until = Some(Instant::now() + Duration::from_millis(200)); - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - } - } - } - RigCommand::SetMode(mode) => { - info!("SetMode requested: {:?}", mode); - if state.control.lock.unwrap_or(false) { - warn!("SetMode blocked: panel lock is active"); - Err(RigError("panel is locked".into())) - } else { - let res = time::timeout(Duration::from_secs(1), rig.set_mode(mode.clone())).await; - match res { - Ok(Ok(())) => { - state.apply_mode(mode.clone()); - poll_pause_until = Some(Instant::now() + Duration::from_millis(200)); - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - Ok(Err(e)) => { - error!("Failed to send CAT SetMode: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } - Err(elapsed) => { - warn!("CAT SetMode timed out ({:?}) but proceeding with state update", elapsed); - state.apply_mode(mode.clone()); - poll_pause_until = Some(Instant::now() + Duration::from_millis(200)); - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - } - } - } - RigCommand::SetPtt(ptt) => { - info!("SetPtt requested: {}", ptt); - if let Err(e) = rig.set_ptt(ptt).await { - error!("Failed to send CAT SetPtt: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } else { - state.status.tx_en = ptt; - if !ptt { - if let Some(tx) = state.status.tx.as_mut() { - tx.power = Some(0); - tx.swr = Some(0.0); - } - } - state.status.lock = state.control.lock; - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - } - RigCommand::PowerOn => { - info!("PowerOn requested"); - if let Err(e) = rig.power_on().await { - error!("Failed to send CAT PowerOn: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } else { - state.control.enabled = Some(true); - time::sleep(Duration::from_secs(3)).await; - let now = Instant::now(); - poll_pause_until = Some(now + Duration::from_secs(3)); - last_power_on = Some(now); - match refresh_state_with_retry(&mut rig, &mut state, 2).await { - Ok(()) => { - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - Err(e) => { - if is_invalid_bcd_error(e.as_ref()) { - warn!("Transient CAT decode after PowerOn (ignored): {:?}", e); - poll_pause_until = Some(Instant::now() + Duration::from_millis(1500)); - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } else { - error!("Failed to refresh after PowerOn: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } - } - } - } - } - RigCommand::PowerOff => { - info!("PowerOff requested"); - if let Err(e) = rig.power_off().await { - error!("Failed to send CAT PowerOff: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } else { - state.control.enabled = Some(false); - state.status.tx_en = false; - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - } - RigCommand::ToggleVfo => { - info!("Toggle VFO requested"); - if state.control.lock.unwrap_or(false) { - warn!("ToggleVfo blocked: panel lock is active"); - Err(RigError("panel is locked".into())) - } else if let Err(e) = rig.toggle_vfo().await { - error!("Failed to send CAT ToggleVfo: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } else { - time::sleep(Duration::from_millis(150)).await; - poll_pause_until = Some(Instant::now() + Duration::from_millis(300)); - match refresh_state_with_retry(&mut rig, &mut state, 2).await { - Ok(()) => { - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - Err(e) => { - error!("Failed to refresh after ToggleVfo: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } - } - } - } - RigCommand::GetTxLimit => match rig.get_tx_limit().await { - Ok(limit) => { - state - .status - .tx - .get_or_insert(RigTxStatus { power: None, limit: None, swr: None, alc: None }) - .limit = Some(limit); - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - Err(e) => { - error!("Failed to read TX limit: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } - }, - RigCommand::SetTxLimit(limit) => match rig.set_tx_limit(limit).await { - Ok(()) => { - state - .status - .tx - .get_or_insert(RigTxStatus { power: None, limit: None, swr: None, alc: None }) - .limit = Some(limit); - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - Err(e) => { - error!("Failed to set TX limit: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } - } - RigCommand::Lock => { - info!("Lock requested"); - match rig.lock().await { - Ok(()) => { - state.control.lock = Some(true); - state.status.lock = Some(true); - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - Err(e) => { - error!("Failed to send CAT Lock: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } - } - } - RigCommand::Unlock => { - info!("Unlock requested"); - match rig.unlock().await { - Ok(()) => { - state.control.lock = Some(false); - state.status.lock = Some(false); - let _ = state_tx.send(state.clone()); - snapshot_from(&state) - } - Err(e) => { - error!("Failed to send CAT Unlock: {:?}", e); - Err(RigError(format!("CAT error: {}", e))) - } - } - } - } - } - }; - - for tx in responders { - let _ = tx.send(result.clone()); - } - let elapsed = started.elapsed(); - if elapsed > Duration::from_millis(500) { - warn!("Rig command {} took {:?}", cmd_label, elapsed); - } else { - debug!("Rig command {} completed in {:?}", cmd_label, elapsed); - } - } - }, - } - } - - info!("rig_task shutting down (channel closed)"); - Ok(()) -} - -async fn refresh_state_from_cat(trx: &mut Box, state: &mut RigState) -> DynResult<()> { - let (freq, mode, vfo) = trx.get_status().await?; - state.control.enabled = Some(true); - state.apply_freq(freq); - state.apply_mode(mode); - state.status.vfo = vfo.clone(); - - if state.status.tx_en { - state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(0); - } else if let Ok(meter) = trx.get_signal_strength().await { - let sig = map_signal_strength(&state.status.mode, meter); - state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(sig); - } - if let Ok(limit) = trx.get_tx_limit().await { - state - .status - .tx - .get_or_insert(RigTxStatus { - power: None, - limit: None, - swr: None, - alc: None, - }) - .limit = Some(limit); - } - if state.status.tx_en { - if let Ok(power) = trx.get_tx_power().await { - state - .status - .tx - .get_or_insert(RigTxStatus { - power: None, - limit: None, - swr: None, - alc: None, - }) - .power = Some(power); - } - } - state.status.lock = Some(state.control.lock.unwrap_or(false)); - Ok(()) -} - -async fn refresh_state_with_retry( - trx: &mut Box, - state: &mut RigState, - attempts: usize, -) -> DynResult<()> { - let mut last_err: Option> = None; - for i in 0..attempts { - match refresh_state_from_cat(trx, state).await { - Ok(()) => return Ok(()), - Err(e) => { - let should_retry = is_invalid_bcd_error(e.as_ref()); - last_err = Some(e); - if should_retry && i + 1 < attempts { - warn!( - "Retrying CAT state read after invalid BCD (attempt {} of {})", - i + 1, - attempts - ); - time::sleep(Duration::from_millis(300)).await; - continue; - } else { - break; - } - } - } - } - - Err(last_err.unwrap_or_else(|| "Unknown CAT error".into())) -} - -async fn prime_vfo_state(trx: &mut Box, state: &mut RigState) -> DynResult<()> { - // Ensure panel is unlocked so we can CAT-control safely. - let _ = trx.unlock().await; - time::sleep(Duration::from_millis(100)).await; - - refresh_state_with_retry(trx, state, 2).await?; - time::sleep(Duration::from_millis(150)).await; - - trx.toggle_vfo().await?; - time::sleep(Duration::from_millis(150)).await; - refresh_state_with_retry(trx, state, 2).await?; - - trx.toggle_vfo().await?; - time::sleep(Duration::from_millis(150)).await; - refresh_state_with_retry(trx, state, 2).await?; - - Ok(()) -} - -/// Handle a single TCP client. -async fn handle_client( - socket: TcpStream, - addr: SocketAddr, - tx: mpsc::Sender, -) -> DynResult<()> { - let (reader, mut writer) = socket.into_split(); - let mut reader = BufReader::new(reader); - let mut line = String::new(); - - loop { - line.clear(); - let bytes_read = reader.read_line(&mut line).await?; - if bytes_read == 0 { - info!("Client {} disconnected", addr); - break; - } - - // Simple protocol: one line = one JSON command. - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - - let cmd: ClientCommand = match serde_json::from_str(trimmed) { - Ok(c) => c, - Err(e) => { - error!("Invalid JSON from {}: {} / {:?}", addr, trimmed, e); - let resp = ClientResponse { - success: false, - state: None, - error: Some(format!("Invalid JSON: {}", e)), - }; - let resp_line = serde_json::to_string(&resp)? + "\n"; - writer.write_all(resp_line.as_bytes()).await?; - writer.flush().await?; - continue; - } - }; - - // Map ClientCommand -> RigCommand. - let rig_cmd = match cmd { - ClientCommand::GetState => RigCommand::GetSnapshot, - ClientCommand::SetFreq { freq_hz } => RigCommand::SetFreq(Freq { hz: freq_hz }), - ClientCommand::SetMode { mode } => RigCommand::SetMode(parse_mode(&mode)), - ClientCommand::SetPtt { ptt } => RigCommand::SetPtt(ptt), - ClientCommand::PowerOn => RigCommand::PowerOn, - ClientCommand::PowerOff => RigCommand::PowerOff, - ClientCommand::ToggleVfo => RigCommand::ToggleVfo, - ClientCommand::GetTxLimit => RigCommand::GetTxLimit, - ClientCommand::SetTxLimit { limit } => RigCommand::SetTxLimit(limit), - }; - - let (resp_tx, resp_rx) = oneshot::channel(); - let req = RigRequest { - cmd: rig_cmd, - respond_to: resp_tx, - }; - - if let Err(e) = tx.send(req).await { - error!("Failed to send request to rig_task: {:?}", e); - let resp = ClientResponse { - success: false, - state: None, - error: Some("Internal error: rig task not available".into()), - }; - let resp_line = serde_json::to_string(&resp)? + "\n"; - writer.write_all(resp_line.as_bytes()).await?; - writer.flush().await?; - continue; - } - - match resp_rx.await { - Ok(Ok(snapshot)) => { - let resp = ClientResponse { - success: true, - state: Some(snapshot), - error: None, - }; - let resp_line = serde_json::to_string(&resp)? + "\n"; - writer.write_all(resp_line.as_bytes()).await?; - writer.flush().await?; - } - Ok(Err(err)) => { - let resp = ClientResponse { - success: false, - state: None, - error: Some(err.0), - }; - let resp_line = serde_json::to_string(&resp)? + "\n"; - writer.write_all(resp_line.as_bytes()).await?; - writer.flush().await?; - } - Err(e) => { - error!("Rig response oneshot recv error: {:?}", e); - let resp = ClientResponse { - success: false, - state: None, - error: Some("Internal error waiting for rig response".into()), - }; - let resp_line = serde_json::to_string(&resp)? + "\n"; - writer.write_all(resp_line.as_bytes()).await?; - writer.flush().await?; - } - } - } - - Ok(()) -} - -fn map_signal_strength(mode: &RigMode, raw: u8) -> i32 { - let val = raw as i32; - match mode { - RigMode::FM | RigMode::WFM => val.saturating_sub(128), - _ => val, - } -} - -/// Parse mode string coming from the client into RigMode. -fn parse_mode(s: &str) -> RigMode { - match s.to_uppercase().as_str() { - "LSB" => RigMode::LSB, - "USB" => RigMode::USB, - "CW" => RigMode::CW, - "CWR" => RigMode::CWR, - "AM" => RigMode::AM, - "FM" => RigMode::FM, - "DIG" | "DIGI" => RigMode::DIG, - "PKT" | "PACKET" => RigMode::PKT, - other => RigMode::Other(other.to_string()), - } -} - -fn snapshot_from(state: &RigState) -> RigResult { - state - .snapshot() - .ok_or_else(|| RigError("Rig info unavailable".into())) -} diff --git a/src/trx-bin/src/remote_client.rs b/src/trx-bin/src/remote_client.rs new file mode 100644 index 0000000..07459bd --- /dev/null +++ b/src/trx-bin/src/remote_client.rs @@ -0,0 +1,202 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +use std::time::Duration; + +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, watch}; +use tokio::time::{self, Instant}; +use tracing::{info, warn}; + +use trx_core::client::{ClientCommand, ClientEnvelope, ClientResponse}; +use trx_core::rig::request::RigRequest; +use trx_core::rig::state::RigState; +use trx_core::rig::RigControl; +use trx_core::{RigError, RigResult}; + +pub struct RemoteClientConfig { + pub addr: String, + pub token: Option, + pub poll_interval: Duration, +} + +pub async fn run_remote_client( + config: RemoteClientConfig, + mut rx: mpsc::Receiver, + state_tx: watch::Sender, +) -> RigResult<()> { + let mut reconnect_delay = Duration::from_secs(1); + + loop { + info!("Qt remote: connecting to {}", config.addr); + match TcpStream::connect(&config.addr).await { + Ok(stream) => { + if let Err(e) = handle_connection(&config, stream, &mut rx, &state_tx).await { + warn!("Qt remote connection dropped: {}", e); + } + } + Err(e) => { + warn!("Qt remote connect failed: {}", e); + } + } + + time::sleep(reconnect_delay).await; + reconnect_delay = (reconnect_delay * 2).min(Duration::from_secs(10)); + } +} + +async fn handle_connection( + config: &RemoteClientConfig, + stream: TcpStream, + rx: &mut mpsc::Receiver, + state_tx: &watch::Sender, +) -> RigResult<()> { + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + let mut poll_interval = time::interval(config.poll_interval); + let mut last_poll = Instant::now(); + + loop { + tokio::select! { + _ = poll_interval.tick() => { + if last_poll.elapsed() < config.poll_interval { + continue; + } + last_poll = Instant::now(); + if let Err(e) = send_command(config, &mut writer, &mut reader, ClientCommand::GetState, state_tx).await { + warn!("Qt remote poll failed: {}", e); + } + } + req = rx.recv() => { + let Some(req) = req else { + return Ok(()); + }; + let cmd = req.cmd; + let result = { + let client_cmd = map_rig_command(cmd); + send_command(config, &mut writer, &mut reader, client_cmd, state_tx).await + }; + + let _ = req.respond_to.send(result); + } + } + } +} + +async fn send_command( + config: &RemoteClientConfig, + writer: &mut tokio::net::tcp::OwnedWriteHalf, + reader: &mut BufReader, + cmd: ClientCommand, + state_tx: &watch::Sender, +) -> RigResult { + let envelope = ClientEnvelope { + token: config.token.clone(), + cmd, + }; + + let payload = serde_json::to_string(&envelope) + .map_err(|e| RigError::communication(format!("JSON serialize failed: {e}")))?; + + writer + .write_all(format!("{}\n", payload).as_bytes()) + .await + .map_err(|e| RigError::communication(format!("write failed: {e}")))?; + writer + .flush() + .await + .map_err(|e| RigError::communication(format!("flush failed: {e}")))?; + + let mut line = String::new(); + reader + .read_line(&mut line) + .await + .map_err(|e| RigError::communication(format!("read failed: {e}")))?; + + let resp: ClientResponse = serde_json::from_str(line.trim_end()) + .map_err(|e| RigError::communication(format!("invalid response: {e}")))?; + + if resp.success { + if let Some(snapshot) = resp.state { + let _ = state_tx.send(state_from_snapshot(snapshot.clone())); + return Ok(snapshot); + } + return Err(RigError::communication("missing snapshot")); + } + + Err(RigError::communication( + resp.error.unwrap_or_else(|| "remote error".into()), + )) +} + +fn map_rig_command(cmd: trx_core::RigCommand) -> ClientCommand { + match cmd { + trx_core::RigCommand::GetSnapshot => ClientCommand::GetState, + trx_core::RigCommand::SetFreq(freq) => ClientCommand::SetFreq { freq_hz: freq.hz }, + trx_core::RigCommand::SetMode(mode) => ClientCommand::SetMode { + mode: mode_label(&mode), + }, + trx_core::RigCommand::SetPtt(ptt) => ClientCommand::SetPtt { ptt }, + trx_core::RigCommand::PowerOn => ClientCommand::PowerOn, + trx_core::RigCommand::PowerOff => ClientCommand::PowerOff, + trx_core::RigCommand::ToggleVfo => ClientCommand::ToggleVfo, + trx_core::RigCommand::GetTxLimit => ClientCommand::GetTxLimit, + trx_core::RigCommand::SetTxLimit(limit) => ClientCommand::SetTxLimit { limit }, + trx_core::RigCommand::Lock => ClientCommand::Lock, + trx_core::RigCommand::Unlock => ClientCommand::Unlock, + } +} + +fn mode_label(mode: &trx_core::rig::state::RigMode) -> String { + match mode { + trx_core::rig::state::RigMode::LSB => "LSB".to_string(), + trx_core::rig::state::RigMode::USB => "USB".to_string(), + trx_core::rig::state::RigMode::CW => "CW".to_string(), + trx_core::rig::state::RigMode::CWR => "CWR".to_string(), + trx_core::rig::state::RigMode::AM => "AM".to_string(), + trx_core::rig::state::RigMode::WFM => "WFM".to_string(), + trx_core::rig::state::RigMode::FM => "FM".to_string(), + trx_core::rig::state::RigMode::DIG => "DIG".to_string(), + trx_core::rig::state::RigMode::PKT => "PKT".to_string(), + trx_core::rig::state::RigMode::Other(val) => val.clone(), + } +} + +pub fn state_from_snapshot(snapshot: trx_core::RigSnapshot) -> RigState { + let status = snapshot.status; + let lock = status.lock; + RigState { + rig_info: Some(snapshot.info), + status, + initialized: snapshot.initialized, + control: RigControl { + rpt_offset_hz: None, + ctcss_hz: None, + dcs_code: None, + lock, + clar_hz: None, + clar_on: None, + enabled: snapshot.enabled, + }, + } +} + +pub fn parse_remote_url(url: &str) -> Result { + let trimmed = url.trim(); + if trimmed.is_empty() { + return Err("remote url is empty".into()); + } + + let addr = trimmed + .strip_prefix("tcp://") + .or_else(|| trimmed.strip_prefix("http-json://")) + .unwrap_or(trimmed); + + if !addr.contains(':') { + return Err("remote url must be host:port".into()); + } + + Ok(addr.to_string()) +}