refactor: split into independent trx-server and trx-client binaries

Delete trx-bin (all-in-one) and trx-bin-common (shared lib). Each binary
now has its own config, plugins, and helper modules inlined.

- trx-server: backend-only daemon with ServerConfig (general, rig, behavior)
  no frontend dependencies
- trx-client: remote client with ClientConfig (general, remote, frontends)
  includes all frontend support (http, rigctl, http-json, qt)
- Dedicated config files: trx-server.toml / trx-client.toml

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-06 22:44:04 +01:00
parent ee25271275
commit 4e9f1d2be6
14 changed files with 1061 additions and 656 deletions
+21
View File
@@ -0,0 +1,21 @@
# SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
#
# SPDX-License-Identifier: BSD-2-Clause
[package]
name = "trx-server"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { workspace = true, features = ["full"] }
tokio-serial = { workspace = true }
serde = { workspace = true, features = ["derive"] }
toml = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
clap = { workspace = true, features = ["derive"] }
dirs = "6"
libloading = "0.8"
trx-backend = { path = "../trx-backend" }
trx-core = { path = "../trx-core" }
+280
View File
@@ -0,0 +1,280 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Configuration file support for trx-server.
//!
//! Supports loading configuration from TOML files with the following search order:
//! 1. Path specified via `--config` CLI argument
//! 2. `./trx-server.toml` (current directory)
//! 3. `~/.config/trx-rs/server.toml` (XDG config)
//! 4. `/etc/trx-rs/server.toml` (system-wide)
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use trx_core::rig::state::RigMode;
/// Top-level server configuration structure.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct ServerConfig {
/// General settings
pub general: GeneralConfig,
/// Rig backend configuration
pub rig: RigConfig,
/// Polling and retry behavior
pub behavior: BehaviorConfig,
}
/// General application settings.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct GeneralConfig {
/// Callsign or owner label
pub callsign: Option<String>,
/// Log level (trace, debug, info, warn, error)
pub log_level: Option<String>,
}
/// Rig backend configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct RigConfig {
/// Rig model (e.g., "ft817", "ic7300")
pub model: Option<String>,
/// Initial frequency (Hz) for the rig state before first CAT read
pub initial_freq_hz: u64,
/// Initial mode for the rig state before first CAT read
pub initial_mode: RigMode,
/// Access method configuration
pub access: AccessConfig,
}
/// Access method configuration for reaching the rig.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct AccessConfig {
/// Access type: "serial" or "tcp"
#[serde(rename = "type")]
pub access_type: Option<String>,
/// Serial port path (for serial access)
pub port: Option<String>,
/// Baud rate (for serial access)
pub baud: Option<u32>,
/// Host address (for TCP access)
pub host: Option<String>,
/// TCP port (for TCP access)
pub tcp_port: Option<u16>,
}
impl Default for RigConfig {
fn default() -> Self {
Self {
model: None,
initial_freq_hz: 144_300_000,
initial_mode: RigMode::USB,
access: AccessConfig::default(),
}
}
}
/// Behavior configuration for polling and retries.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct BehaviorConfig {
/// Polling interval in milliseconds when idle
pub poll_interval_ms: u64,
/// Polling interval in milliseconds when transmitting
pub poll_interval_tx_ms: u64,
/// Maximum retry attempts for transient errors
pub max_retries: u32,
/// Base delay for exponential backoff in milliseconds
pub retry_base_delay_ms: u64,
}
impl Default for BehaviorConfig {
fn default() -> Self {
Self {
poll_interval_ms: 500,
poll_interval_tx_ms: 100,
max_retries: 3,
retry_base_delay_ms: 100,
}
}
}
impl ServerConfig {
/// Load configuration from a specific file path.
pub fn load_from_file(path: &Path) -> Result<Self, ConfigError> {
let contents = std::fs::read_to_string(path)
.map_err(|e| ConfigError::ReadError(path.to_path_buf(), e.to_string()))?;
toml::from_str(&contents)
.map_err(|e| ConfigError::ParseError(path.to_path_buf(), e.to_string()))
}
/// Load configuration from the default search paths.
/// Returns default config if no config file is found.
pub fn load_from_default_paths() -> Result<(Self, Option<PathBuf>), ConfigError> {
let search_paths = Self::default_search_paths();
for path in search_paths {
if path.exists() {
let config = Self::load_from_file(&path)?;
return Ok((config, Some(path)));
}
}
Ok((Self::default(), None))
}
/// Get the default search paths for config files.
pub fn default_search_paths() -> Vec<PathBuf> {
let mut paths = Vec::new();
// Current directory
paths.push(PathBuf::from("trx-server.toml"));
// XDG config directory
if let Some(config_dir) = dirs::config_dir() {
paths.push(config_dir.join("trx-rs").join("server.toml"));
}
// System-wide config
paths.push(PathBuf::from("/etc/trx-rs/server.toml"));
paths
}
/// Generate an example configuration as a TOML string.
pub fn example_toml() -> String {
let example = ServerConfig {
general: GeneralConfig {
callsign: Some("N0CALL".to_string()),
log_level: Some("info".to_string()),
},
rig: RigConfig {
model: Some("ft817".to_string()),
initial_freq_hz: 144_300_000,
initial_mode: RigMode::USB,
access: AccessConfig {
access_type: Some("serial".to_string()),
port: Some("/dev/ttyUSB0".to_string()),
baud: Some(9600),
host: None,
tcp_port: None,
},
},
behavior: BehaviorConfig::default(),
};
toml::to_string_pretty(&example).unwrap_or_default()
}
}
/// Errors that can occur when loading configuration.
#[derive(Debug)]
pub enum ConfigError {
/// Failed to read the config file
ReadError(PathBuf, String),
/// Failed to parse the config file
ParseError(PathBuf, String),
}
impl std::fmt::Display for ConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ReadError(path, err) => {
write!(
f,
"failed to read config file '{}': {}",
path.display(),
err
)
}
Self::ParseError(path, err) => {
write!(
f,
"failed to parse config file '{}': {}",
path.display(),
err
)
}
}
}
}
impl std::error::Error for ConfigError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = ServerConfig::default();
assert_eq!(config.rig.initial_freq_hz, 144_300_000);
assert_eq!(config.rig.initial_mode, RigMode::USB);
assert_eq!(config.behavior.poll_interval_ms, 500);
assert_eq!(config.behavior.max_retries, 3);
}
#[test]
fn test_parse_minimal_toml() {
let toml_str = r#"
[rig]
model = "ft817"
[rig.access]
type = "serial"
port = "/dev/ttyUSB0"
baud = 9600
"#;
let config: ServerConfig = toml::from_str(toml_str).unwrap();
assert_eq!(config.rig.model, Some("ft817".to_string()));
assert_eq!(config.rig.access.port, Some("/dev/ttyUSB0".to_string()));
assert_eq!(config.rig.access.baud, Some(9600));
}
#[test]
fn test_parse_full_toml() {
let toml_str = r#"
[general]
callsign = "W1AW"
log_level = "debug"
[rig]
model = "ft817"
initial_freq_hz = 7100000
initial_mode = "LSB"
[rig.access]
type = "serial"
port = "/dev/ttyUSB0"
baud = 9600
[behavior]
poll_interval_ms = 1000
poll_interval_tx_ms = 200
max_retries = 5
retry_base_delay_ms = 50
"#;
let config: ServerConfig = toml::from_str(toml_str).unwrap();
assert_eq!(config.general.callsign, Some("W1AW".to_string()));
assert_eq!(config.general.log_level, Some("debug".to_string()));
assert_eq!(config.rig.initial_freq_hz, 7_100_000);
assert_eq!(config.rig.initial_mode, RigMode::LSB);
assert_eq!(config.behavior.poll_interval_ms, 1000);
assert_eq!(config.behavior.max_retries, 5);
}
#[test]
fn test_example_toml_parses() {
let example = ServerConfig::example_toml();
let _config: ServerConfig = toml::from_str(&example).unwrap();
}
}
+13
View File
@@ -0,0 +1,13 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
use std::error::Error;
/// Detect the specific CAT decode error for invalid BCD digits.
pub fn is_invalid_bcd_error(err: &(dyn Error + 'static)) -> bool {
if err.to_string().contains("invalid BCD digit in frequency") {
return true;
}
err.source().map(is_invalid_bcd_error).unwrap_or(false)
}
+282
View File
@@ -0,0 +1,282 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
mod config;
mod error;
mod plugins;
mod rig_task;
use std::path::PathBuf;
use std::time::Duration;
use clap::{Parser, ValueEnum};
use tokio::signal;
use tokio::sync::{mpsc, watch};
use tracing::info;
use trx_backend::{is_backend_registered, register_builtin_backends, registered_backends, RigAccess};
use trx_core::radio::freq::Freq;
use trx_core::rig::controller::{AdaptivePolling, ExponentialBackoff};
use trx_core::rig::request::RigRequest;
use trx_core::rig::state::RigState;
use trx_core::rig::{RigControl, RigRxStatus, RigStatus, RigTxStatus};
use trx_core::DynResult;
use config::ServerConfig;
const PKG_DESCRIPTION: &str = concat!(env!("CARGO_PKG_NAME"), " - rig server daemon");
const RIG_TASK_CHANNEL_BUFFER: usize = 32;
const RETRY_MAX_DELAY_SECS: u64 = 2;
#[derive(Debug, Parser)]
#[command(
author = env!("CARGO_PKG_AUTHORS"),
version = env!("CARGO_PKG_VERSION"),
about = PKG_DESCRIPTION,
)]
struct Cli {
/// Path to configuration file
#[arg(long = "config", short = 'C', value_name = "FILE")]
config: Option<PathBuf>,
/// Print example configuration and exit
#[arg(long = "print-config")]
print_config: bool,
/// Rig backend to use (e.g. ft817)
#[arg(short = 'r', long = "rig")]
rig: Option<String>,
/// Access method to reach the rig CAT interface
#[arg(short = 'a', long = "access", value_enum)]
access: Option<AccessKind>,
/// Rig CAT address:
/// when access is serial: <path> <baud>;
/// when access is TCP: <host>:<port>
#[arg(value_name = "RIG_ADDR")]
rig_addr: Option<String>,
/// Optional callsign/owner label
#[arg(short = 'c', long = "callsign")]
callsign: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
enum AccessKind {
Serial,
Tcp,
}
/// Normalize a rig name to lowercase alphanumeric.
fn normalize_name(name: &str) -> String {
name.to_ascii_lowercase()
.chars()
.filter(|c| c.is_ascii_alphanumeric())
.collect()
}
/// Parse a serial rig address of the form "<path> <baud>".
fn parse_serial_addr(addr: &str) -> DynResult<(String, u32)> {
let mut parts = addr.split_whitespace();
let path = parts
.next()
.ok_or("Serial rig address must be '<path> <baud>'")?;
let baud_str = parts
.next()
.ok_or("Serial rig address must be '<path> <baud>'")?;
if parts.next().is_some() {
return Err("Serial rig address must be '<path> <baud>' (got extra data)".into());
}
let baud: u32 = baud_str
.parse()
.map_err(|e| format!("Invalid baud '{}': {}", baud_str, e))?;
Ok((path.to_string(), baud))
}
/// Resolved configuration after merging config file and CLI arguments.
struct ResolvedConfig {
rig: String,
access: RigAccess,
callsign: Option<String>,
}
fn resolve_config(cli: &Cli, cfg: &ServerConfig) -> DynResult<ResolvedConfig> {
let rig_str = cli.rig.clone().or_else(|| cfg.rig.model.clone());
let rig = match rig_str.as_deref() {
Some(name) => normalize_name(name),
None => {
return Err(
"Rig model not specified. Use --rig or set [rig].model in config.".into(),
)
}
};
if !is_backend_registered(&rig) {
return Err(format!(
"Unknown rig model: {} (available: {})",
rig,
registered_backends().join(", ")
)
.into());
}
let access = {
let access_type = cli
.access
.as_ref()
.map(|a| match a {
AccessKind::Serial => "serial",
AccessKind::Tcp => "tcp",
})
.or(cfg.rig.access.access_type.as_deref());
match access_type {
Some("serial") | None => {
let (path, baud) = if let Some(ref addr) = cli.rig_addr {
parse_serial_addr(addr)?
} else if let (Some(port), Some(baud)) =
(&cfg.rig.access.port, cfg.rig.access.baud)
{
(port.clone(), baud)
} else {
return Err("Serial access requires port and baud. Use '<path> <baud>' argument or set [rig.access].port and .baud in config.".into());
};
RigAccess::Serial { path, baud }
}
Some("tcp") => {
let addr = if let Some(ref addr) = cli.rig_addr {
addr.clone()
} else if let (Some(host), Some(port)) =
(&cfg.rig.access.host, cfg.rig.access.tcp_port)
{
format!("{}:{}", host, port)
} else {
return Err("TCP access requires host:port. Use argument or set [rig.access].host and .tcp_port in config.".into());
};
RigAccess::Tcp { addr }
}
Some(other) => return Err(format!("Unknown access type: {}", other).into()),
}
};
let callsign = cli
.callsign
.clone()
.or_else(|| cfg.general.callsign.clone());
Ok(ResolvedConfig {
rig,
access,
callsign,
})
}
fn build_initial_state(cfg: &ServerConfig) -> RigState {
RigState {
rig_info: None,
status: RigStatus {
freq: Freq {
hz: cfg.rig.initial_freq_hz,
},
mode: cfg.rig.initial_mode.clone(),
tx_en: false,
vfo: None,
tx: Some(RigTxStatus {
power: None,
limit: None,
swr: None,
alc: None,
}),
rx: Some(RigRxStatus { sig: None }),
lock: Some(false),
},
initialized: false,
control: RigControl {
rpt_offset_hz: None,
ctcss_hz: None,
dcs_code: None,
lock: Some(false),
clar_hz: None,
clar_on: None,
enabled: Some(false),
},
}
}
fn build_rig_task_config(
resolved: &ResolvedConfig,
cfg: &ServerConfig,
) -> rig_task::RigTaskConfig {
rig_task::RigTaskConfig {
rig_model: resolved.rig.clone(),
access: resolved.access.clone(),
polling: AdaptivePolling::new(
Duration::from_millis(cfg.behavior.poll_interval_ms),
Duration::from_millis(cfg.behavior.poll_interval_tx_ms),
),
retry: ExponentialBackoff::new(
cfg.behavior.max_retries.max(1),
Duration::from_millis(cfg.behavior.retry_base_delay_ms),
Duration::from_secs(RETRY_MAX_DELAY_SECS),
),
initial_freq_hz: cfg.rig.initial_freq_hz,
initial_mode: cfg.rig.initial_mode.clone(),
}
}
#[tokio::main]
async fn main() -> DynResult<()> {
tracing_subscriber::fmt().with_target(false).init();
register_builtin_backends();
let _plugin_libs = plugins::load_plugins();
let cli = Cli::parse();
if cli.print_config {
println!("{}", ServerConfig::example_toml());
return Ok(());
}
let (cfg, config_path) = if let Some(ref path) = cli.config {
let cfg = ServerConfig::load_from_file(path)?;
(cfg, Some(path.clone()))
} else {
ServerConfig::load_from_default_paths()?
};
if let Some(ref path) = config_path {
info!("Loaded configuration from {}", path.display());
}
let resolved = resolve_config(&cli, &cfg)?;
match &resolved.access {
RigAccess::Serial { path, baud } => {
info!(
"Starting trx-server (rig: {}, access: serial {} @ {} baud)",
resolved.rig, path, baud
);
}
RigAccess::Tcp { addr } => {
info!(
"Starting trx-server (rig: {}, access: tcp {})",
resolved.rig, addr
);
}
}
if let Some(ref cs) = resolved.callsign {
info!("Callsign: {}", cs);
}
let (tx, rx) = mpsc::channel::<RigRequest>(RIG_TASK_CHANNEL_BUFFER);
let initial_state = build_initial_state(&cfg);
let (state_tx, state_rx) = watch::channel(initial_state);
// Keep receivers alive so channels don't close prematurely
let _state_rx = state_rx;
let _tx = tx;
let rig_task_config = build_rig_task_config(&resolved, &cfg);
let _rig_handle = tokio::spawn(rig_task::run_rig_task(rig_task_config, rx, state_tx));
signal::ctrl_c().await?;
info!("Ctrl+C received, shutting down");
Ok(())
}
+115
View File
@@ -0,0 +1,115 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use libloading::{Library, Symbol};
use tracing::{info, warn};
const PLUGIN_ENV: &str = "TRX_PLUGIN_DIRS";
const PLUGIN_ENTRYPOINT: &str = "trx_register";
#[cfg(windows)]
const PATH_SEPARATOR: char = ';';
#[cfg(not(windows))]
const PATH_SEPARATOR: char = ':';
#[cfg(windows)]
const PLUGIN_EXTENSIONS: &[&str] = &["dll"];
#[cfg(target_os = "macos")]
const PLUGIN_EXTENSIONS: &[&str] = &["dylib"];
#[cfg(all(unix, not(target_os = "macos")))]
const PLUGIN_EXTENSIONS: &[&str] = &["so"];
pub fn load_plugins() -> Vec<Library> {
let mut libraries = Vec::new();
let search_paths = plugin_search_paths();
if search_paths.is_empty() {
return libraries;
}
info!("Plugin search paths: {:?}", search_paths);
for path in search_paths {
if let Err(err) = load_plugins_from_dir(&path, &mut libraries) {
warn!("Plugin scan failed for {:?}: {}", path, err);
}
}
libraries
}
fn load_plugins_from_dir(path: &Path, libraries: &mut Vec<Library>) -> std::io::Result<()> {
if !path.exists() {
return Ok(());
}
for entry in std::fs::read_dir(path)? {
let entry = entry?;
let path = entry.path();
if !path.is_file() {
continue;
}
if !is_plugin_file(&path) {
continue;
}
unsafe {
match Library::new(&path) {
Ok(lib) => {
if let Err(err) = register_library(&lib, &path) {
warn!("Plugin {:?} failed to register: {}", path, err);
continue;
}
info!("Loaded plugin {:?}", path);
libraries.push(lib);
}
Err(err) => {
warn!("Failed to load plugin {:?}: {}", path, err);
}
}
}
}
Ok(())
}
unsafe fn register_library(lib: &Library, path: &Path) -> Result<(), String> {
let entry: Symbol<unsafe extern "C" fn()> = lib
.get(PLUGIN_ENTRYPOINT.as_bytes())
.map_err(|e| format!("missing entrypoint {}: {}", PLUGIN_ENTRYPOINT, e))?;
entry();
info!("Registered plugin {:?}", path);
Ok(())
}
fn plugin_search_paths() -> Vec<PathBuf> {
let mut paths = Vec::new();
if let Ok(env_paths) = std::env::var(PLUGIN_ENV) {
for raw in env_paths.split(PATH_SEPARATOR) {
if raw.trim().is_empty() {
continue;
}
paths.push(PathBuf::from(raw));
}
}
paths.push(PathBuf::from("plugins"));
if let Some(config_dir) = dirs::config_dir() {
paths.push(config_dir.join("trx-rs").join("plugins"));
}
paths
}
fn is_plugin_file(path: &Path) -> bool {
path.extension()
.and_then(OsStr::to_str)
.map(|ext| PLUGIN_EXTENSIONS.iter().any(|e| ext.eq_ignore_ascii_case(e)))
.unwrap_or(false)
}
+730
View File
@@ -0,0 +1,730 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Rig task implementation using controller components.
use std::time::Duration;
use tokio::sync::{mpsc, watch};
use tokio::time::{self, Instant};
use tracing::{debug, error, info, warn};
use trx_backend::{build_rig, RigAccess};
use trx_core::radio::freq::Freq;
use trx_core::rig::command::RigCommand;
use trx_core::rig::controller::{
command_from_rig_command, AdaptivePolling, CommandContext, CommandResult, ExponentialBackoff,
PollingPolicy, ReadyStateData, RetryPolicy, RigCatExecutor, RigEventEmitter, RigMachineState,
RigStateMachine, TransmittingStateData, ValidationResult,
};
use trx_core::rig::request::RigRequest;
use trx_core::rig::state::{RigMode, RigSnapshot, RigState};
use trx_core::rig::{RigCat, RigControl, RigRxStatus, RigStatus, RigTxStatus};
use trx_core::{DynResult, RigError, RigResult};
use crate::error::is_invalid_bcd_error;
/// Configuration for the rig task.
pub struct RigTaskConfig {
pub rig_model: String,
pub access: RigAccess,
pub polling: AdaptivePolling,
pub retry: ExponentialBackoff,
pub initial_freq_hz: u64,
pub initial_mode: RigMode,
}
impl Default for RigTaskConfig {
fn default() -> Self {
Self {
rig_model: "ft817".to_string(),
access: RigAccess::Serial {
path: "/dev/ttyUSB0".to_string(),
baud: 9600,
},
polling: AdaptivePolling::default(),
retry: ExponentialBackoff::default(),
initial_freq_hz: 144_300_000,
initial_mode: RigMode::USB,
}
}
}
/// Command context implementation for validation.
struct TaskCommandContext<'a> {
machine: &'a RigStateMachine,
}
impl<'a> CommandContext for TaskCommandContext<'a> {
fn state(&self) -> &RigMachineState {
self.machine.state()
}
}
/// Run the rig task with the new controller-based implementation.
pub async fn run_rig_task(
config: RigTaskConfig,
mut rx: mpsc::Receiver<RigRequest>,
state_tx: watch::Sender<RigState>,
) -> DynResult<()> {
info!("Opening rig backend {}", config.rig_model);
match &config.access {
RigAccess::Serial { path, baud } => info!("Serial: {} @ {} baud", path, baud),
RigAccess::Tcp { addr } => info!("TCP CAT: {}", addr),
}
let mut rig: Box<dyn RigCat> = build_rig(&config.rig_model, config.access)?;
info!("Rig backend ready");
// Initialize state machine and state
let mut machine = RigStateMachine::new();
let emitter = RigEventEmitter::new();
let mut state = RigState {
rig_info: None,
status: RigStatus {
freq: Freq { hz: 144_300_000 },
mode: RigMode::USB,
tx_en: false,
vfo: None,
tx: Some(RigTxStatus {
power: None,
limit: None,
swr: None,
alc: None,
}),
rx: Some(RigRxStatus { sig: None }),
lock: Some(false),
},
initialized: false,
control: RigControl {
rpt_offset_hz: None,
ctcss_hz: None,
dcs_code: None,
lock: Some(false),
clar_hz: None,
clar_on: None,
enabled: Some(false),
},
};
// Polling configuration
let polling = &config.polling;
let retry = &config.retry;
let mut poll_pause_until: Option<Instant> = None;
let mut last_power_on: Option<Instant> = None;
let mut initial_status_read = false;
// Initial setup: get rig info
let rig_info = rig.info().clone();
state.rig_info = Some(rig_info);
if let Some(info) = state.rig_info.as_ref() {
info!(
"Rig info: {} {} {}",
info.manufacturer, info.model, info.revision
);
}
let old_machine_state = machine.state().clone();
sync_machine_state(&mut machine, &state);
let new_machine_state = machine.state().clone();
emit_state_changes(
&emitter,
&state,
&state,
&old_machine_state,
&new_machine_state,
);
let _ = state_tx.send(state.clone());
// Initial power-on sequence
if !state.control.enabled.unwrap_or(false) {
info!("Sending initial PowerOn to wake rig");
match rig.power_on().await {
Ok(()) => {
state.control.enabled = Some(true);
time::sleep(Duration::from_secs(3)).await;
if let Err(e) = refresh_state_with_retry(&mut rig, &mut state, retry).await {
warn!(
"Initial PowerOn refresh failed: {:?}; retrying once after short delay",
e
);
time::sleep(Duration::from_millis(500)).await;
if let Err(e2) = refresh_state_with_retry(&mut rig, &mut state, retry).await {
warn!(
"Initial PowerOn second refresh failed (continuing): {:?}",
e2
);
}
} else {
initial_status_read = true;
}
info!("Rig initialized after power on sequence");
}
Err(e) => warn!("Initial PowerOn failed (continuing): {:?}", e),
}
}
// Prime VFO state
if let Err(e) = prime_vfo_state(&mut rig, &mut state, retry).await {
warn!("VFO priming failed: {:?}", e);
} else {
initial_status_read = true;
}
if initial_status_read {
let old_state = state.clone();
if let Err(e) = apply_initial_tune(
&mut rig,
&mut state,
retry,
config.initial_freq_hz,
&config.initial_mode,
)
.await
{
warn!("Initial tune failed (continuing): {:?}", e);
} else {
let old_machine_state = machine.state().clone();
sync_machine_state(&mut machine, &state);
let new_machine_state = machine.state().clone();
emit_state_changes(
&emitter,
&old_state,
&state,
&old_machine_state,
&new_machine_state,
);
}
}
state.initialized = true;
let old_machine_state = machine.state().clone();
sync_machine_state(&mut machine, &state);
let new_machine_state = machine.state().clone();
emit_state_changes(
&emitter,
&state,
&state,
&old_machine_state,
&new_machine_state,
);
let _ = state_tx.send(state.clone());
// Main task loop
loop {
let poll_duration = polling.interval(state.status.tx_en);
let mut poll_interval = time::interval(poll_duration);
tokio::select! {
_ = poll_interval.tick() => {
// Check if polling is paused
if let Some(until) = poll_pause_until {
if Instant::now() < until {
continue;
} else {
poll_pause_until = None;
}
}
// Skip polling if rig is powered off
if matches!(state.control.enabled, Some(false)) {
continue;
}
// Poll rig state
let old_state = state.clone();
match refresh_state_with_retry(&mut rig, &mut state, retry).await {
Ok(()) => {
let old_machine_state = machine.state().clone();
sync_machine_state(&mut machine, &state);
let new_machine_state = machine.state().clone();
emit_state_changes(
&emitter,
&old_state,
&state,
&old_machine_state,
&new_machine_state,
);
let _ = state_tx.send(state.clone());
}
Err(e) => {
error!("CAT polling error: {:?}", e);
// Grace period after power on
if let Some(last_on) = last_power_on {
if Instant::now().duration_since(last_on) < Duration::from_secs(5) {
poll_pause_until = Some(Instant::now() + Duration::from_millis(800));
continue;
}
}
}
}
},
maybe_req = rx.recv() => {
let Some(first_req) = maybe_req else { break; };
// Batch up any pending requests
let mut batch = vec![first_req];
while let Ok(next) = rx.try_recv() {
batch.push(next);
}
// Process each request
while let Some(RigRequest { cmd, respond_to }) = batch.pop() {
let cmd_label = format!("{:?}", cmd);
let started = Instant::now();
let mut cmd_ctx = CommandExecContext {
rig: &mut rig,
state: &mut state,
machine: &mut machine,
emitter: &emitter,
poll_pause_until: &mut poll_pause_until,
last_power_on: &mut last_power_on,
state_tx: &state_tx,
retry,
};
let result = process_command(cmd, &mut cmd_ctx).await;
let _ = respond_to.send(result);
let elapsed = started.elapsed();
if elapsed > Duration::from_millis(500) {
warn!("Rig command {} took {:?}", cmd_label, elapsed);
} else {
debug!("Rig command {} completed in {:?}", cmd_label, elapsed);
}
}
},
}
}
info!("rig_task shutting down (channel closed)");
Ok(())
}
/// Process a single rig command using command handlers.
struct CommandExecContext<'a> {
rig: &'a mut Box<dyn RigCat>,
state: &'a mut RigState,
machine: &'a mut RigStateMachine,
emitter: &'a RigEventEmitter,
poll_pause_until: &'a mut Option<Instant>,
last_power_on: &'a mut Option<Instant>,
state_tx: &'a watch::Sender<RigState>,
retry: &'a ExponentialBackoff,
}
async fn process_command(
cmd: RigCommand,
ctx: &mut CommandExecContext<'_>,
) -> RigResult<RigSnapshot> {
sync_machine_state(ctx.machine, ctx.state);
// Check if rig is ready for commands
let not_ready =
!ctx.state.initialized && !matches!(cmd, RigCommand::PowerOn | RigCommand::GetSnapshot);
if not_ready {
return Err(RigError::invalid_state("rig not initialized yet"));
}
// Get command handler and validate
let handler = command_from_rig_command(cmd.clone());
let ctx_view = TaskCommandContext {
machine: ctx.machine,
};
match handler.can_execute(&ctx_view) {
ValidationResult::Ok => {}
ValidationResult::Locked => {
warn!("{} blocked: panel lock is active", handler.name());
return Err(RigError::invalid_state("panel is locked"));
}
ValidationResult::InvalidState(msg) => {
warn!("{} blocked: {}", handler.name(), msg);
return Err(RigError::invalid_state(msg));
}
ValidationResult::InvalidParams(msg) => {
warn!("{} invalid params: {}", handler.name(), msg);
return Err(RigError::invalid_state(msg));
}
}
// Execute command
let old_state = ctx.state.clone();
let mut executor = RigCatExecutor::new(ctx.rig.as_mut());
let result = handler.execute(&mut executor).await;
match result {
Ok(cmd_result) => {
// Apply state updates based on command result
match cmd_result {
CommandResult::FreqUpdated(freq) => {
ctx.state.apply_freq(freq);
*ctx.poll_pause_until = Some(Instant::now() + Duration::from_millis(200));
}
CommandResult::ModeUpdated(mode) => {
ctx.state.apply_mode(mode);
*ctx.poll_pause_until = Some(Instant::now() + Duration::from_millis(200));
}
CommandResult::PttUpdated(ptt) => {
ctx.state.apply_ptt(ptt);
}
CommandResult::PowerUpdated(on) => {
ctx.state.control.enabled = Some(on);
if on {
time::sleep(Duration::from_secs(3)).await;
let now = Instant::now();
*ctx.poll_pause_until = Some(now + Duration::from_secs(3));
*ctx.last_power_on = Some(now);
// Refresh state after power on
if let Err(e) =
refresh_state_with_retry(ctx.rig, ctx.state, ctx.retry).await
{
if is_invalid_bcd_error(e.as_ref()) {
warn!("Transient CAT decode after PowerOn (ignored): {:?}", e);
*ctx.poll_pause_until =
Some(Instant::now() + Duration::from_millis(1500));
} else {
error!("Failed to refresh after PowerOn: {:?}", e);
return Err(RigError::communication(format!("CAT error: {}", e)));
}
}
} else {
ctx.state.status.tx_en = false;
}
}
CommandResult::LockUpdated(locked) => {
ctx.state.control.lock = Some(locked);
ctx.state.status.lock = Some(locked);
}
CommandResult::TxLimitUpdated(limit) => {
ctx.state
.status
.tx
.get_or_insert(RigTxStatus {
power: None,
limit: None,
swr: None,
alc: None,
})
.limit = Some(limit);
}
CommandResult::RefreshRequired => {
// For commands like ToggleVfo, GetSnapshot
if matches!(cmd, RigCommand::ToggleVfo) {
time::sleep(Duration::from_millis(150)).await;
*ctx.poll_pause_until = Some(Instant::now() + Duration::from_millis(300));
}
if let Err(e) = refresh_state_with_retry(ctx.rig, ctx.state, ctx.retry).await {
error!("Failed to refresh state: {:?}", e);
return Err(RigError::communication(format!("CAT error: {}", e)));
}
}
CommandResult::Ok => {}
}
let old_machine_state = ctx.machine.state().clone();
sync_machine_state(ctx.machine, ctx.state);
let new_machine_state = ctx.machine.state().clone();
emit_state_changes(
ctx.emitter,
&old_state,
ctx.state,
&old_machine_state,
&new_machine_state,
);
let _ = ctx.state_tx.send(ctx.state.clone());
snapshot_from(ctx.state)
}
Err(e) => {
error!("Command {} failed: {:?}", handler.name(), e);
Err(RigError::communication(format!("CAT error: {}", e)))
}
}
}
/// Refresh state from CAT with retry logic using the retry policy.
async fn refresh_state_with_retry(
rig: &mut Box<dyn RigCat>,
state: &mut RigState,
retry: &ExponentialBackoff,
) -> DynResult<()> {
let mut last_err: Option<Box<dyn std::error::Error + Send + Sync>> = None;
let max = retry.max_attempts() as usize;
for attempt in 0..max {
match refresh_state_from_cat(rig, state).await {
Ok(()) => return Ok(()),
Err(e) => {
let rig_err = RigError::communication(e.to_string());
if retry.should_retry(attempt as u32, &rig_err) && attempt + 1 < max {
let delay = retry.delay(attempt as u32);
warn!(
"Retrying CAT state read (attempt {} of {}, delay {:?})",
attempt + 1,
max,
delay
);
time::sleep(delay).await;
last_err = Some(e);
continue;
} else {
return Err(e);
}
}
}
}
Err(last_err.unwrap_or_else(|| "Unknown CAT error".into()))
}
/// Read current state from the rig via CAT.
async fn refresh_state_from_cat(rig: &mut Box<dyn RigCat>, state: &mut RigState) -> DynResult<()> {
let (freq, mode, vfo) = rig.get_status().await?;
state.control.enabled = Some(true);
state.apply_freq(freq);
state.apply_mode(mode);
state.status.vfo = vfo;
if state.status.tx_en {
state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(0);
} else if let Ok(meter) = rig.get_signal_strength().await {
let sig = map_signal_strength(&state.status.mode, meter);
state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(sig);
}
if let Ok(limit) = rig.get_tx_limit().await {
state
.status
.tx
.get_or_insert(RigTxStatus {
power: None,
limit: None,
swr: None,
alc: None,
})
.limit = Some(limit);
}
if state.status.tx_en {
if let Ok(power) = rig.get_tx_power().await {
state
.status
.tx
.get_or_insert(RigTxStatus {
power: None,
limit: None,
swr: None,
alc: None,
})
.power = Some(power);
}
}
state.status.lock = Some(state.control.lock.unwrap_or(false));
Ok(())
}
/// Apply initial mode/frequency after a successful CAT status read.
async fn apply_initial_tune(
rig: &mut Box<dyn RigCat>,
state: &mut RigState,
retry: &ExponentialBackoff,
initial_freq_hz: u64,
initial_mode: &RigMode,
) -> DynResult<()> {
let needs_freq = state.status.freq.hz != initial_freq_hz;
let needs_mode = &state.status.mode != initial_mode;
if !needs_freq && !needs_mode {
return Ok(());
}
if needs_mode {
rig.set_mode(initial_mode.clone()).await?;
}
if needs_freq {
rig.set_freq(Freq {
hz: initial_freq_hz,
})
.await?;
}
refresh_state_with_retry(rig, state, retry).await
}
/// Prime VFO state by toggling and reading both VFOs.
async fn prime_vfo_state(
rig: &mut Box<dyn RigCat>,
state: &mut RigState,
retry: &ExponentialBackoff,
) -> DynResult<()> {
// Ensure panel is unlocked
let _ = rig.unlock().await;
time::sleep(Duration::from_millis(100)).await;
refresh_state_with_retry(rig, state, retry).await?;
time::sleep(Duration::from_millis(150)).await;
rig.toggle_vfo().await?;
time::sleep(Duration::from_millis(150)).await;
refresh_state_with_retry(rig, state, retry).await?;
rig.toggle_vfo().await?;
time::sleep(Duration::from_millis(150)).await;
refresh_state_with_retry(rig, state, retry).await?;
Ok(())
}
/// Map raw signal strength to S-meter value based on mode.
fn map_signal_strength(mode: &RigMode, raw: u8) -> i32 {
// FT-817 returns 0-15 for signal strength
// Map to approximate dBm / S-units
match mode {
RigMode::FM | RigMode::WFM => -120 + (raw as i32 * 6),
_ => -127 + (raw as i32 * 6),
}
}
/// Create a snapshot from current state.
fn snapshot_from(state: &RigState) -> RigResult<RigSnapshot> {
state
.snapshot()
.ok_or_else(|| RigError::invalid_state("Rig info unavailable"))
}
fn sync_machine_state(machine: &mut RigStateMachine, state: &RigState) {
let desired = desired_machine_state(state);
match (machine.state().clone(), &desired) {
(RigMachineState::Ready(_), RigMachineState::Ready(new_data)) => {
machine.update_ready_data(|data| {
*data = new_data.clone();
});
}
(RigMachineState::Transmitting(_), RigMachineState::Transmitting(new_data)) => {
machine.update_transmitting_data(|data| {
*data = new_data.clone();
});
}
_ => {
machine.set_state(desired);
}
}
}
fn desired_machine_state(state: &RigState) -> RigMachineState {
let rig_info = state.rig_info.clone();
if !state.initialized {
return rig_info
.map(|info| RigMachineState::Initializing {
rig_info: Some(info),
})
.unwrap_or(RigMachineState::Disconnected);
}
let Some(info) = rig_info else {
return RigMachineState::Disconnected;
};
if matches!(state.control.enabled, Some(false)) {
return RigMachineState::PoweredOff { rig_info: info };
}
if state.status.tx_en {
RigMachineState::Transmitting(transmitting_data_from_state(state, info))
} else {
RigMachineState::Ready(ready_data_from_state(state, info))
}
}
fn ready_data_from_state(state: &RigState, rig_info: trx_core::rig::RigInfo) -> ReadyStateData {
ReadyStateData {
rig_info,
freq: state.status.freq,
mode: state.status.mode.clone(),
vfo: state.status.vfo.clone(),
rx: state.status.rx.clone(),
tx_limit: state.status.tx.as_ref().and_then(|tx| tx.limit),
locked: lock_state_from(state),
}
}
fn transmitting_data_from_state(
state: &RigState,
rig_info: trx_core::rig::RigInfo,
) -> TransmittingStateData {
TransmittingStateData {
rig_info,
freq: state.status.freq,
mode: state.status.mode.clone(),
vfo: state.status.vfo.clone(),
tx: state.status.tx.clone(),
locked: lock_state_from(state),
}
}
fn lock_state_from(state: &RigState) -> bool {
state.control.lock.or(state.status.lock).unwrap_or(false)
}
fn emit_state_changes(
emitter: &RigEventEmitter,
old_state: &RigState,
new_state: &RigState,
old_machine_state: &RigMachineState,
new_machine_state: &RigMachineState,
) {
if old_state.status.freq.hz != new_state.status.freq.hz {
emitter.notify_frequency_change(Some(old_state.status.freq), new_state.status.freq);
}
if old_state.status.mode != new_state.status.mode {
emitter.notify_mode_change(Some(&old_state.status.mode), &new_state.status.mode);
}
if old_state.status.tx_en != new_state.status.tx_en {
emitter.notify_ptt_change(new_state.status.tx_en);
}
if lock_state_from(old_state) != lock_state_from(new_state) {
emitter.notify_lock_change(lock_state_from(new_state));
}
if old_state.control.enabled.unwrap_or(false) != new_state.control.enabled.unwrap_or(false) {
emitter.notify_power_change(new_state.control.enabled.unwrap_or(false));
}
if meters_changed(old_state, new_state) {
emitter.notify_meter_update(new_state.status.rx.as_ref(), new_state.status.tx.as_ref());
}
if std::mem::discriminant(old_machine_state) != std::mem::discriminant(new_machine_state) {
emitter.notify_state_change(old_machine_state, new_machine_state);
}
}
fn meters_changed(old_state: &RigState, new_state: &RigState) -> bool {
let old_rx_sig = old_state.status.rx.as_ref().and_then(|rx| rx.sig);
let new_rx_sig = new_state.status.rx.as_ref().and_then(|rx| rx.sig);
if old_rx_sig != new_rx_sig {
return true;
}
let (old_tx_power, old_tx_limit, old_tx_swr, old_tx_alc) =
tx_meter_parts(old_state.status.tx.as_ref());
let (new_tx_power, new_tx_limit, new_tx_swr, new_tx_alc) =
tx_meter_parts(new_state.status.tx.as_ref());
old_tx_power != new_tx_power
|| old_tx_limit != new_tx_limit
|| old_tx_swr != new_tx_swr
|| old_tx_alc != new_tx_alc
}
fn tx_meter_parts(tx: Option<&RigTxStatus>) -> (Option<u8>, Option<u8>, Option<f32>, Option<u8>) {
tx.map(|tx| (tx.power, tx.limit, tx.swr, tx.alc))
.unwrap_or((None, None, None, None))
}