From a941c77039741a50ef35d065f690ee489702fa4b Mon Sep 17 00:00:00 2001 From: Stanislaw Grams Date: Sun, 18 Jan 2026 09:20:10 +0100 Subject: [PATCH] rig: integrate controller and rig task updates --- src/trx-bin/src/rig_task.rs | 730 ++++++++++++++++++++ src/trx-core/src/rig/controller/events.rs | 207 ++++++ src/trx-core/src/rig/controller/executor.rs | 85 +++ src/trx-core/src/rig/controller/mod.rs | 26 + src/trx-core/src/rig/controller/policies.rs | 288 ++++++++ 5 files changed, 1336 insertions(+) create mode 100644 src/trx-bin/src/rig_task.rs create mode 100644 src/trx-core/src/rig/controller/events.rs create mode 100644 src/trx-core/src/rig/controller/executor.rs create mode 100644 src/trx-core/src/rig/controller/mod.rs create mode 100644 src/trx-core/src/rig/controller/policies.rs diff --git a/src/trx-bin/src/rig_task.rs b/src/trx-bin/src/rig_task.rs new file mode 100644 index 0000000..f0969e5 --- /dev/null +++ b/src/trx-bin/src/rig_task.rs @@ -0,0 +1,730 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Rig task implementation using controller components. + +use std::time::Duration; + +use tokio::sync::{mpsc, watch}; +use tokio::time::{self, Instant}; +use tracing::{debug, error, info, warn}; + +use trx_backend::{build_rig, RigAccess}; +use trx_core::radio::freq::Freq; +use trx_core::rig::command::RigCommand; +use trx_core::rig::controller::{ + command_from_rig_command, AdaptivePolling, CommandContext, CommandResult, ExponentialBackoff, + PollingPolicy, ReadyStateData, RetryPolicy, RigCatExecutor, RigEventEmitter, RigMachineState, + RigStateMachine, TransmittingStateData, ValidationResult, +}; +use trx_core::rig::request::RigRequest; +use trx_core::rig::state::{RigMode, RigSnapshot, RigState}; +use trx_core::rig::{RigCat, RigControl, RigRxStatus, RigStatus, RigTxStatus}; +use trx_core::{DynResult, RigError, RigResult}; + +use crate::error::is_invalid_bcd_error; + +/// Configuration for the rig task. +pub struct RigTaskConfig { + pub rig_model: String, + pub access: RigAccess, + pub polling: AdaptivePolling, + pub retry: ExponentialBackoff, + pub initial_freq_hz: u64, + pub initial_mode: RigMode, +} + +impl Default for RigTaskConfig { + fn default() -> Self { + Self { + rig_model: "ft817".to_string(), + access: RigAccess::Serial { + path: "/dev/ttyUSB0".to_string(), + baud: 9600, + }, + polling: AdaptivePolling::default(), + retry: ExponentialBackoff::default(), + initial_freq_hz: 144_300_000, + initial_mode: RigMode::USB, + } + } +} + +/// Command context implementation for validation. +struct TaskCommandContext<'a> { + machine: &'a RigStateMachine, +} + +impl<'a> CommandContext for TaskCommandContext<'a> { + fn state(&self) -> &RigMachineState { + self.machine.state() + } +} + +/// Run the rig task with the new controller-based implementation. +pub async fn run_rig_task( + config: RigTaskConfig, + mut rx: mpsc::Receiver, + state_tx: watch::Sender, +) -> DynResult<()> { + info!("Opening rig backend {}", config.rig_model); + match &config.access { + RigAccess::Serial { path, baud } => info!("Serial: {} @ {} baud", path, baud), + RigAccess::Tcp { addr } => info!("TCP CAT: {}", addr), + } + + let mut rig: Box = build_rig(&config.rig_model, config.access)?; + info!("Rig backend ready"); + + // Initialize state machine and state + let mut machine = RigStateMachine::new(); + let emitter = RigEventEmitter::new(); + let mut state = RigState { + rig_info: None, + status: RigStatus { + freq: Freq { hz: 144_300_000 }, + mode: RigMode::USB, + tx_en: false, + vfo: None, + tx: Some(RigTxStatus { + power: None, + limit: None, + swr: None, + alc: None, + }), + rx: Some(RigRxStatus { sig: None }), + lock: Some(false), + }, + initialized: false, + control: RigControl { + rpt_offset_hz: None, + ctcss_hz: None, + dcs_code: None, + lock: Some(false), + clar_hz: None, + clar_on: None, + enabled: Some(false), + }, + }; + + // Polling configuration + let polling = &config.polling; + let retry = &config.retry; + let mut poll_pause_until: Option = None; + let mut last_power_on: Option = None; + let mut initial_status_read = false; + + // Initial setup: get rig info + let rig_info = rig.info().clone(); + state.rig_info = Some(rig_info); + if let Some(info) = state.rig_info.as_ref() { + info!( + "Rig info: {} {} {}", + info.manufacturer, info.model, info.revision + ); + } + let old_machine_state = machine.state().clone(); + sync_machine_state(&mut machine, &state); + let new_machine_state = machine.state().clone(); + emit_state_changes( + &emitter, + &state, + &state, + &old_machine_state, + &new_machine_state, + ); + let _ = state_tx.send(state.clone()); + + // Initial power-on sequence + if !state.control.enabled.unwrap_or(false) { + info!("Sending initial PowerOn to wake rig"); + match rig.power_on().await { + Ok(()) => { + state.control.enabled = Some(true); + time::sleep(Duration::from_secs(3)).await; + if let Err(e) = refresh_state_with_retry(&mut rig, &mut state, retry).await { + warn!( + "Initial PowerOn refresh failed: {:?}; retrying once after short delay", + e + ); + time::sleep(Duration::from_millis(500)).await; + if let Err(e2) = refresh_state_with_retry(&mut rig, &mut state, retry).await { + warn!( + "Initial PowerOn second refresh failed (continuing): {:?}", + e2 + ); + } + } else { + initial_status_read = true; + } + info!("Rig initialized after power on sequence"); + } + Err(e) => warn!("Initial PowerOn failed (continuing): {:?}", e), + } + } + + // Prime VFO state + if let Err(e) = prime_vfo_state(&mut rig, &mut state, retry).await { + warn!("VFO priming failed: {:?}", e); + } else { + initial_status_read = true; + } + + if initial_status_read { + let old_state = state.clone(); + if let Err(e) = apply_initial_tune( + &mut rig, + &mut state, + retry, + config.initial_freq_hz, + &config.initial_mode, + ) + .await + { + warn!("Initial tune failed (continuing): {:?}", e); + } else { + let old_machine_state = machine.state().clone(); + sync_machine_state(&mut machine, &state); + let new_machine_state = machine.state().clone(); + emit_state_changes( + &emitter, + &old_state, + &state, + &old_machine_state, + &new_machine_state, + ); + } + } + + state.initialized = true; + let old_machine_state = machine.state().clone(); + sync_machine_state(&mut machine, &state); + let new_machine_state = machine.state().clone(); + emit_state_changes( + &emitter, + &state, + &state, + &old_machine_state, + &new_machine_state, + ); + let _ = state_tx.send(state.clone()); + + // Main task loop + loop { + let poll_duration = polling.interval(state.status.tx_en); + let mut poll_interval = time::interval(poll_duration); + + tokio::select! { + _ = poll_interval.tick() => { + // Check if polling is paused + if let Some(until) = poll_pause_until { + if Instant::now() < until { + continue; + } else { + poll_pause_until = None; + } + } + + // Skip polling if rig is powered off + if matches!(state.control.enabled, Some(false)) { + continue; + } + + // Poll rig state + let old_state = state.clone(); + match refresh_state_with_retry(&mut rig, &mut state, retry).await { + Ok(()) => { + let old_machine_state = machine.state().clone(); + sync_machine_state(&mut machine, &state); + let new_machine_state = machine.state().clone(); + emit_state_changes( + &emitter, + &old_state, + &state, + &old_machine_state, + &new_machine_state, + ); + let _ = state_tx.send(state.clone()); + } + Err(e) => { + error!("CAT polling error: {:?}", e); + // Grace period after power on + if let Some(last_on) = last_power_on { + if Instant::now().duration_since(last_on) < Duration::from_secs(5) { + poll_pause_until = Some(Instant::now() + Duration::from_millis(800)); + continue; + } + } + } + } + }, + + maybe_req = rx.recv() => { + let Some(first_req) = maybe_req else { break; }; + + // Batch up any pending requests + let mut batch = vec![first_req]; + while let Ok(next) = rx.try_recv() { + batch.push(next); + } + + // Process each request + while let Some(RigRequest { cmd, respond_to }) = batch.pop() { + let cmd_label = format!("{:?}", cmd); + let started = Instant::now(); + + let mut cmd_ctx = CommandExecContext { + rig: &mut rig, + state: &mut state, + machine: &mut machine, + emitter: &emitter, + poll_pause_until: &mut poll_pause_until, + last_power_on: &mut last_power_on, + state_tx: &state_tx, + retry, + }; + let result = process_command(cmd, &mut cmd_ctx).await; + + let _ = respond_to.send(result); + + let elapsed = started.elapsed(); + if elapsed > Duration::from_millis(500) { + warn!("Rig command {} took {:?}", cmd_label, elapsed); + } else { + debug!("Rig command {} completed in {:?}", cmd_label, elapsed); + } + } + }, + } + } + + info!("rig_task shutting down (channel closed)"); + Ok(()) +} + +/// Process a single rig command using command handlers. +struct CommandExecContext<'a> { + rig: &'a mut Box, + state: &'a mut RigState, + machine: &'a mut RigStateMachine, + emitter: &'a RigEventEmitter, + poll_pause_until: &'a mut Option, + last_power_on: &'a mut Option, + state_tx: &'a watch::Sender, + retry: &'a ExponentialBackoff, +} + +async fn process_command( + cmd: RigCommand, + ctx: &mut CommandExecContext<'_>, +) -> RigResult { + sync_machine_state(ctx.machine, ctx.state); + + // Check if rig is ready for commands + let not_ready = + !ctx.state.initialized && !matches!(cmd, RigCommand::PowerOn | RigCommand::GetSnapshot); + + if not_ready { + return Err(RigError::invalid_state("rig not initialized yet")); + } + + // Get command handler and validate + let handler = command_from_rig_command(cmd.clone()); + let ctx_view = TaskCommandContext { + machine: ctx.machine, + }; + + match handler.can_execute(&ctx_view) { + ValidationResult::Ok => {} + ValidationResult::Locked => { + warn!("{} blocked: panel lock is active", handler.name()); + return Err(RigError::invalid_state("panel is locked")); + } + ValidationResult::InvalidState(msg) => { + warn!("{} blocked: {}", handler.name(), msg); + return Err(RigError::invalid_state(msg)); + } + ValidationResult::InvalidParams(msg) => { + warn!("{} invalid params: {}", handler.name(), msg); + return Err(RigError::invalid_state(msg)); + } + } + + // Execute command + let old_state = ctx.state.clone(); + let mut executor = RigCatExecutor::new(ctx.rig.as_mut()); + let result = handler.execute(&mut executor).await; + + match result { + Ok(cmd_result) => { + // Apply state updates based on command result + match cmd_result { + CommandResult::FreqUpdated(freq) => { + ctx.state.apply_freq(freq); + *ctx.poll_pause_until = Some(Instant::now() + Duration::from_millis(200)); + } + CommandResult::ModeUpdated(mode) => { + ctx.state.apply_mode(mode); + *ctx.poll_pause_until = Some(Instant::now() + Duration::from_millis(200)); + } + CommandResult::PttUpdated(ptt) => { + ctx.state.apply_ptt(ptt); + } + CommandResult::PowerUpdated(on) => { + ctx.state.control.enabled = Some(on); + if on { + time::sleep(Duration::from_secs(3)).await; + let now = Instant::now(); + *ctx.poll_pause_until = Some(now + Duration::from_secs(3)); + *ctx.last_power_on = Some(now); + // Refresh state after power on + if let Err(e) = + refresh_state_with_retry(ctx.rig, ctx.state, ctx.retry).await + { + if is_invalid_bcd_error(e.as_ref()) { + warn!("Transient CAT decode after PowerOn (ignored): {:?}", e); + *ctx.poll_pause_until = + Some(Instant::now() + Duration::from_millis(1500)); + } else { + error!("Failed to refresh after PowerOn: {:?}", e); + return Err(RigError::communication(format!("CAT error: {}", e))); + } + } + } else { + ctx.state.status.tx_en = false; + } + } + CommandResult::LockUpdated(locked) => { + ctx.state.control.lock = Some(locked); + ctx.state.status.lock = Some(locked); + } + CommandResult::TxLimitUpdated(limit) => { + ctx.state + .status + .tx + .get_or_insert(RigTxStatus { + power: None, + limit: None, + swr: None, + alc: None, + }) + .limit = Some(limit); + } + CommandResult::RefreshRequired => { + // For commands like ToggleVfo, GetSnapshot + if matches!(cmd, RigCommand::ToggleVfo) { + time::sleep(Duration::from_millis(150)).await; + *ctx.poll_pause_until = Some(Instant::now() + Duration::from_millis(300)); + } + if let Err(e) = refresh_state_with_retry(ctx.rig, ctx.state, ctx.retry).await { + error!("Failed to refresh state: {:?}", e); + return Err(RigError::communication(format!("CAT error: {}", e))); + } + } + CommandResult::Ok => {} + } + + let old_machine_state = ctx.machine.state().clone(); + sync_machine_state(ctx.machine, ctx.state); + let new_machine_state = ctx.machine.state().clone(); + emit_state_changes( + ctx.emitter, + &old_state, + ctx.state, + &old_machine_state, + &new_machine_state, + ); + let _ = ctx.state_tx.send(ctx.state.clone()); + snapshot_from(ctx.state) + } + Err(e) => { + error!("Command {} failed: {:?}", handler.name(), e); + Err(RigError::communication(format!("CAT error: {}", e))) + } + } +} + +/// Refresh state from CAT with retry logic using the retry policy. +async fn refresh_state_with_retry( + rig: &mut Box, + state: &mut RigState, + retry: &ExponentialBackoff, +) -> DynResult<()> { + let mut last_err: Option> = None; + let max = retry.max_attempts() as usize; + + for attempt in 0..max { + match refresh_state_from_cat(rig, state).await { + Ok(()) => return Ok(()), + Err(e) => { + let rig_err = RigError::communication(e.to_string()); + if retry.should_retry(attempt as u32, &rig_err) && attempt + 1 < max { + let delay = retry.delay(attempt as u32); + warn!( + "Retrying CAT state read (attempt {} of {}, delay {:?})", + attempt + 1, + max, + delay + ); + time::sleep(delay).await; + last_err = Some(e); + continue; + } else { + return Err(e); + } + } + } + } + + Err(last_err.unwrap_or_else(|| "Unknown CAT error".into())) +} + +/// Read current state from the rig via CAT. +async fn refresh_state_from_cat(rig: &mut Box, state: &mut RigState) -> DynResult<()> { + let (freq, mode, vfo) = rig.get_status().await?; + state.control.enabled = Some(true); + state.apply_freq(freq); + state.apply_mode(mode); + state.status.vfo = vfo; + + if state.status.tx_en { + state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(0); + } else if let Ok(meter) = rig.get_signal_strength().await { + let sig = map_signal_strength(&state.status.mode, meter); + state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(sig); + } + + if let Ok(limit) = rig.get_tx_limit().await { + state + .status + .tx + .get_or_insert(RigTxStatus { + power: None, + limit: None, + swr: None, + alc: None, + }) + .limit = Some(limit); + } + + if state.status.tx_en { + if let Ok(power) = rig.get_tx_power().await { + state + .status + .tx + .get_or_insert(RigTxStatus { + power: None, + limit: None, + swr: None, + alc: None, + }) + .power = Some(power); + } + } + + state.status.lock = Some(state.control.lock.unwrap_or(false)); + Ok(()) +} + +/// Apply initial mode/frequency after a successful CAT status read. +async fn apply_initial_tune( + rig: &mut Box, + state: &mut RigState, + retry: &ExponentialBackoff, + initial_freq_hz: u64, + initial_mode: &RigMode, +) -> DynResult<()> { + let needs_freq = state.status.freq.hz != initial_freq_hz; + let needs_mode = &state.status.mode != initial_mode; + + if !needs_freq && !needs_mode { + return Ok(()); + } + + if needs_mode { + rig.set_mode(initial_mode.clone()).await?; + } + if needs_freq { + rig.set_freq(Freq { + hz: initial_freq_hz, + }) + .await?; + } + + refresh_state_with_retry(rig, state, retry).await +} + +/// Prime VFO state by toggling and reading both VFOs. +async fn prime_vfo_state( + rig: &mut Box, + state: &mut RigState, + retry: &ExponentialBackoff, +) -> DynResult<()> { + // Ensure panel is unlocked + let _ = rig.unlock().await; + time::sleep(Duration::from_millis(100)).await; + + refresh_state_with_retry(rig, state, retry).await?; + time::sleep(Duration::from_millis(150)).await; + + rig.toggle_vfo().await?; + time::sleep(Duration::from_millis(150)).await; + refresh_state_with_retry(rig, state, retry).await?; + + rig.toggle_vfo().await?; + time::sleep(Duration::from_millis(150)).await; + refresh_state_with_retry(rig, state, retry).await?; + + Ok(()) +} + +/// Map raw signal strength to S-meter value based on mode. +fn map_signal_strength(mode: &RigMode, raw: u8) -> i32 { + // FT-817 returns 0-15 for signal strength + // Map to approximate dBm / S-units + match mode { + RigMode::FM | RigMode::WFM => -120 + (raw as i32 * 6), + _ => -127 + (raw as i32 * 6), + } +} + +/// Create a snapshot from current state. +fn snapshot_from(state: &RigState) -> RigResult { + state + .snapshot() + .ok_or_else(|| RigError::invalid_state("Rig info unavailable")) +} + +fn sync_machine_state(machine: &mut RigStateMachine, state: &RigState) { + let desired = desired_machine_state(state); + match (machine.state().clone(), &desired) { + (RigMachineState::Ready(_), RigMachineState::Ready(new_data)) => { + machine.update_ready_data(|data| { + *data = new_data.clone(); + }); + } + (RigMachineState::Transmitting(_), RigMachineState::Transmitting(new_data)) => { + machine.update_transmitting_data(|data| { + *data = new_data.clone(); + }); + } + _ => { + machine.set_state(desired); + } + } +} + +fn desired_machine_state(state: &RigState) -> RigMachineState { + let rig_info = state.rig_info.clone(); + if !state.initialized { + return rig_info + .map(|info| RigMachineState::Initializing { + rig_info: Some(info), + }) + .unwrap_or(RigMachineState::Disconnected); + } + + let Some(info) = rig_info else { + return RigMachineState::Disconnected; + }; + + if matches!(state.control.enabled, Some(false)) { + return RigMachineState::PoweredOff { rig_info: info }; + } + + if state.status.tx_en { + RigMachineState::Transmitting(transmitting_data_from_state(state, info)) + } else { + RigMachineState::Ready(ready_data_from_state(state, info)) + } +} + +fn ready_data_from_state(state: &RigState, rig_info: trx_core::rig::RigInfo) -> ReadyStateData { + ReadyStateData { + rig_info, + freq: state.status.freq, + mode: state.status.mode.clone(), + vfo: state.status.vfo.clone(), + rx: state.status.rx.clone(), + tx_limit: state.status.tx.as_ref().and_then(|tx| tx.limit), + locked: lock_state_from(state), + } +} + +fn transmitting_data_from_state( + state: &RigState, + rig_info: trx_core::rig::RigInfo, +) -> TransmittingStateData { + TransmittingStateData { + rig_info, + freq: state.status.freq, + mode: state.status.mode.clone(), + vfo: state.status.vfo.clone(), + tx: state.status.tx.clone(), + locked: lock_state_from(state), + } +} + +fn lock_state_from(state: &RigState) -> bool { + state.control.lock.or(state.status.lock).unwrap_or(false) +} + +fn emit_state_changes( + emitter: &RigEventEmitter, + old_state: &RigState, + new_state: &RigState, + old_machine_state: &RigMachineState, + new_machine_state: &RigMachineState, +) { + if old_state.status.freq.hz != new_state.status.freq.hz { + emitter.notify_frequency_change(Some(old_state.status.freq), new_state.status.freq); + } + + if old_state.status.mode != new_state.status.mode { + emitter.notify_mode_change(Some(&old_state.status.mode), &new_state.status.mode); + } + + if old_state.status.tx_en != new_state.status.tx_en { + emitter.notify_ptt_change(new_state.status.tx_en); + } + + if lock_state_from(old_state) != lock_state_from(new_state) { + emitter.notify_lock_change(lock_state_from(new_state)); + } + + if old_state.control.enabled.unwrap_or(false) != new_state.control.enabled.unwrap_or(false) { + emitter.notify_power_change(new_state.control.enabled.unwrap_or(false)); + } + + if meters_changed(old_state, new_state) { + emitter.notify_meter_update(new_state.status.rx.as_ref(), new_state.status.tx.as_ref()); + } + + if std::mem::discriminant(old_machine_state) != std::mem::discriminant(new_machine_state) { + emitter.notify_state_change(old_machine_state, new_machine_state); + } +} + +fn meters_changed(old_state: &RigState, new_state: &RigState) -> bool { + let old_rx_sig = old_state.status.rx.as_ref().and_then(|rx| rx.sig); + let new_rx_sig = new_state.status.rx.as_ref().and_then(|rx| rx.sig); + if old_rx_sig != new_rx_sig { + return true; + } + + let (old_tx_power, old_tx_limit, old_tx_swr, old_tx_alc) = + tx_meter_parts(old_state.status.tx.as_ref()); + let (new_tx_power, new_tx_limit, new_tx_swr, new_tx_alc) = + tx_meter_parts(new_state.status.tx.as_ref()); + + old_tx_power != new_tx_power + || old_tx_limit != new_tx_limit + || old_tx_swr != new_tx_swr + || old_tx_alc != new_tx_alc +} + +fn tx_meter_parts(tx: Option<&RigTxStatus>) -> (Option, Option, Option, Option) { + tx.map(|tx| (tx.power, tx.limit, tx.swr, tx.alc)) + .unwrap_or((None, None, None, None)) +} diff --git a/src/trx-core/src/rig/controller/events.rs b/src/trx-core/src/rig/controller/events.rs new file mode 100644 index 0000000..4a75052 --- /dev/null +++ b/src/trx-core/src/rig/controller/events.rs @@ -0,0 +1,207 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Rig event notification system. +//! +//! This module provides typed event notifications for rig state changes, +//! allowing frontends and other components to react to specific events. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use crate::radio::freq::Freq; +use crate::rig::state::RigMode; +use crate::rig::{RigRxStatus, RigTxStatus}; + +use super::machine::RigMachineState; + +/// Unique identifier for a registered listener. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ListenerId(u64); + +impl ListenerId { + fn new() -> Self { + static COUNTER: AtomicU64 = AtomicU64::new(0); + Self(COUNTER.fetch_add(1, Ordering::Relaxed)) + } +} + +/// Trait for components that want to receive rig events. +/// +/// Implementors receive typed notifications when rig state changes. +/// All methods have default no-op implementations, so listeners can +/// selectively override only the events they care about. +pub trait RigListener: Send + Sync { + /// Called when the operating frequency changes. + fn on_frequency_change(&self, _old: Option, _new: Freq) {} + + /// Called when the operating mode changes. + fn on_mode_change(&self, _old: Option<&RigMode>, _new: &RigMode) {} + + /// Called when PTT state changes. + fn on_ptt_change(&self, _transmitting: bool) {} + + /// Called when the rig state machine transitions. + fn on_state_change(&self, _old: &RigMachineState, _new: &RigMachineState) {} + + /// Called when meter readings are updated. + fn on_meter_update(&self, _rx: Option<&RigRxStatus>, _tx: Option<&RigTxStatus>) {} + + /// Called when the panel lock state changes. + fn on_lock_change(&self, _locked: bool) {} + + /// Called when the rig powers on or off. + fn on_power_change(&self, _powered: bool) {} +} + +/// Manages registered listeners and dispatches events. +pub struct RigEventEmitter { + listeners: Vec<(ListenerId, Arc)>, +} + +impl Default for RigEventEmitter { + fn default() -> Self { + Self::new() + } +} + +impl RigEventEmitter { + /// Create a new event emitter with no listeners. + pub fn new() -> Self { + Self { + listeners: Vec::new(), + } + } + + /// Register a listener to receive events. + /// Returns an ID that can be used to unregister the listener. + pub fn register(&mut self, listener: Arc) -> ListenerId { + let id = ListenerId::new(); + self.listeners.push((id, listener)); + id + } + + /// Unregister a listener by its ID. + pub fn unregister(&mut self, id: ListenerId) { + self.listeners.retain(|(lid, _)| *lid != id); + } + + /// Get the number of registered listeners. + pub fn listener_count(&self) -> usize { + self.listeners.len() + } + + /// Notify all listeners of a frequency change. + pub fn notify_frequency_change(&self, old: Option, new: Freq) { + for (_, listener) in &self.listeners { + listener.on_frequency_change(old, new); + } + } + + /// Notify all listeners of a mode change. + pub fn notify_mode_change(&self, old: Option<&RigMode>, new: &RigMode) { + for (_, listener) in &self.listeners { + listener.on_mode_change(old, new); + } + } + + /// Notify all listeners of a PTT state change. + pub fn notify_ptt_change(&self, transmitting: bool) { + for (_, listener) in &self.listeners { + listener.on_ptt_change(transmitting); + } + } + + /// Notify all listeners of a state machine transition. + pub fn notify_state_change(&self, old: &RigMachineState, new: &RigMachineState) { + for (_, listener) in &self.listeners { + listener.on_state_change(old, new); + } + } + + /// Notify all listeners of updated meter readings. + pub fn notify_meter_update(&self, rx: Option<&RigRxStatus>, tx: Option<&RigTxStatus>) { + for (_, listener) in &self.listeners { + listener.on_meter_update(rx, tx); + } + } + + /// Notify all listeners of a lock state change. + pub fn notify_lock_change(&self, locked: bool) { + for (_, listener) in &self.listeners { + listener.on_lock_change(locked); + } + } + + /// Notify all listeners of a power state change. + pub fn notify_power_change(&self, powered: bool) { + for (_, listener) in &self.listeners { + listener.on_power_change(powered); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::AtomicBool; + + struct TestListener { + freq_changed: AtomicBool, + ptt_changed: AtomicBool, + } + + impl TestListener { + fn new() -> Self { + Self { + freq_changed: AtomicBool::new(false), + ptt_changed: AtomicBool::new(false), + } + } + } + + impl RigListener for TestListener { + fn on_frequency_change(&self, _old: Option, _new: Freq) { + self.freq_changed.store(true, Ordering::Relaxed); + } + + fn on_ptt_change(&self, _transmitting: bool) { + self.ptt_changed.store(true, Ordering::Relaxed); + } + } + + #[test] + fn test_register_and_notify() { + let mut emitter = RigEventEmitter::new(); + let listener = Arc::new(TestListener::new()); + let id = emitter.register(listener.clone()); + + assert_eq!(emitter.listener_count(), 1); + + emitter.notify_frequency_change(None, Freq { hz: 14_200_000 }); + assert!(listener.freq_changed.load(Ordering::Relaxed)); + assert!(!listener.ptt_changed.load(Ordering::Relaxed)); + + emitter.notify_ptt_change(true); + assert!(listener.ptt_changed.load(Ordering::Relaxed)); + + emitter.unregister(id); + assert_eq!(emitter.listener_count(), 0); + } + + #[test] + fn test_multiple_listeners() { + let mut emitter = RigEventEmitter::new(); + let listener1 = Arc::new(TestListener::new()); + let listener2 = Arc::new(TestListener::new()); + + emitter.register(listener1.clone()); + emitter.register(listener2.clone()); + + emitter.notify_frequency_change(Some(Freq { hz: 7_000_000 }), Freq { hz: 14_200_000 }); + + assert!(listener1.freq_changed.load(Ordering::Relaxed)); + assert!(listener2.freq_changed.load(Ordering::Relaxed)); + } +} diff --git a/src/trx-core/src/rig/controller/executor.rs b/src/trx-core/src/rig/controller/executor.rs new file mode 100644 index 0000000..d2546c9 --- /dev/null +++ b/src/trx-core/src/rig/controller/executor.rs @@ -0,0 +1,85 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Command executor implementation that bridges to RigCat. + +use std::future::Future; +use std::pin::Pin; + +use crate::radio::freq::Freq; +use crate::rig::state::RigMode; +use crate::rig::RigCat; +use crate::DynResult; + +use super::handlers::CommandExecutor; + +/// Executor that delegates to a RigCat implementation. +pub struct RigCatExecutor<'a> { + rig: &'a mut dyn RigCat, +} + +impl<'a> RigCatExecutor<'a> { + pub fn new(rig: &'a mut dyn RigCat) -> Self { + Self { rig } + } +} + +impl<'a> CommandExecutor for RigCatExecutor<'a> { + fn set_freq<'b>( + &'b mut self, + freq: Freq, + ) -> Pin> + Send + 'b>> { + self.rig.set_freq(freq) + } + + fn set_mode<'b>( + &'b mut self, + mode: RigMode, + ) -> Pin> + Send + 'b>> { + self.rig.set_mode(mode) + } + + fn set_ptt<'b>( + &'b mut self, + ptt: bool, + ) -> Pin> + Send + 'b>> { + self.rig.set_ptt(ptt) + } + + fn power_on<'b>(&'b mut self) -> Pin> + Send + 'b>> { + self.rig.power_on() + } + + fn power_off<'b>(&'b mut self) -> Pin> + Send + 'b>> { + self.rig.power_off() + } + + fn toggle_vfo<'b>(&'b mut self) -> Pin> + Send + 'b>> { + self.rig.toggle_vfo() + } + + fn lock<'b>(&'b mut self) -> Pin> + Send + 'b>> { + self.rig.lock() + } + + fn unlock<'b>(&'b mut self) -> Pin> + Send + 'b>> { + self.rig.unlock() + } + + fn get_tx_limit<'b>(&'b mut self) -> Pin> + Send + 'b>> { + self.rig.get_tx_limit() + } + + fn set_tx_limit<'b>( + &'b mut self, + limit: u8, + ) -> Pin> + Send + 'b>> { + self.rig.set_tx_limit(limit) + } + + fn refresh_state<'b>(&'b mut self) -> Pin> + Send + 'b>> { + // This is a no-op for the executor - the controller handles state refresh + Box::pin(async { Ok(()) }) + } +} diff --git a/src/trx-core/src/rig/controller/mod.rs b/src/trx-core/src/rig/controller/mod.rs new file mode 100644 index 0000000..592431a --- /dev/null +++ b/src/trx-core/src/rig/controller/mod.rs @@ -0,0 +1,26 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Rig controller components. +//! +//! This module contains the core control logic for managing rig state, +//! handling commands, emitting events, and configuring operational policies. + +pub mod events; +pub mod executor; +pub mod handlers; +pub mod machine; +pub mod policies; + +pub use events::{ListenerId, RigEventEmitter, RigListener}; +pub use executor::RigCatExecutor; +pub use handlers::{ + command_from_rig_command, CommandContext, CommandExecutor, CommandResult, RigCommandHandler, + ValidationResult, +}; +pub use machine::{ + ReadyStateData, RigEvent, RigMachineState, RigStateError, RigStateMachine, + TransmittingStateData, +}; +pub use policies::{AdaptivePolling, ExponentialBackoff, PollingPolicy, RetryPolicy}; diff --git a/src/trx-core/src/rig/controller/policies.rs b/src/trx-core/src/rig/controller/policies.rs new file mode 100644 index 0000000..a5c615d --- /dev/null +++ b/src/trx-core/src/rig/controller/policies.rs @@ -0,0 +1,288 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Rig operational policies for retry and polling behavior. +//! +//! This module provides configurable policies that control how the rig +//! controller handles retries after failures and polling intervals. + +use std::time::Duration; + +use crate::rig::response::RigError; + +/// Policy for retrying failed operations. +pub trait RetryPolicy: Send + Sync { + /// Determine if the operation should be retried. + fn should_retry(&self, attempt: u32, error: &RigError) -> bool; + + /// Get the delay before the next retry attempt. + fn delay(&self, attempt: u32) -> Duration; + + /// Get the maximum number of attempts allowed. + fn max_attempts(&self) -> u32; +} + +/// Exponential backoff retry policy. +/// +/// Delays increase exponentially with each retry attempt, +/// up to a configured maximum delay. +#[derive(Debug, Clone)] +pub struct ExponentialBackoff { + max_attempts: u32, + base_delay: Duration, + max_delay: Duration, +} + +impl ExponentialBackoff { + /// Create a new exponential backoff policy. + pub fn new(max_attempts: u32, base_delay: Duration, max_delay: Duration) -> Self { + Self { + max_attempts, + base_delay, + max_delay, + } + } + + /// Create a policy with sensible defaults for rig communication. + pub fn default_rig() -> Self { + Self { + max_attempts: 3, + base_delay: Duration::from_millis(100), + max_delay: Duration::from_secs(2), + } + } +} + +impl Default for ExponentialBackoff { + fn default() -> Self { + Self::default_rig() + } +} + +impl RetryPolicy for ExponentialBackoff { + fn should_retry(&self, attempt: u32, error: &RigError) -> bool { + if attempt >= self.max_attempts { + return false; + } + // Only retry transient errors + error.is_transient() + } + + fn delay(&self, attempt: u32) -> Duration { + let multiplier = 2u32.saturating_pow(attempt); + let delay = self.base_delay.saturating_mul(multiplier); + delay.min(self.max_delay) + } + + fn max_attempts(&self) -> u32 { + self.max_attempts + } +} + +/// Fixed delay retry policy. +/// +/// Uses a constant delay between retry attempts. +#[derive(Debug, Clone)] +pub struct FixedDelay { + max_attempts: u32, + delay: Duration, +} + +impl FixedDelay { + /// Create a new fixed delay policy. + pub fn new(max_attempts: u32, delay: Duration) -> Self { + Self { + max_attempts, + delay, + } + } +} + +impl RetryPolicy for FixedDelay { + fn should_retry(&self, attempt: u32, error: &RigError) -> bool { + attempt < self.max_attempts && error.is_transient() + } + + fn delay(&self, _attempt: u32) -> Duration { + self.delay + } + + fn max_attempts(&self) -> u32 { + self.max_attempts + } +} + +/// No retry policy - operations fail immediately. +#[derive(Debug, Clone, Copy, Default)] +pub struct NoRetry; + +impl RetryPolicy for NoRetry { + fn should_retry(&self, _attempt: u32, _error: &RigError) -> bool { + false + } + + fn delay(&self, _attempt: u32) -> Duration { + Duration::ZERO + } + + fn max_attempts(&self) -> u32 { + 1 + } +} + +/// Policy for polling the rig for status updates. +pub trait PollingPolicy: Send + Sync { + /// Get the interval between polls. + fn interval(&self, transmitting: bool) -> Duration; + + /// Determine if polling should occur given the current state. + fn should_poll(&self, transmitting: bool) -> bool; +} + +/// Adaptive polling policy. +/// +/// Uses different intervals depending on whether the rig is transmitting. +/// Polls more frequently during TX to track power/SWR meters. +#[derive(Debug, Clone)] +pub struct AdaptivePolling { + idle_interval: Duration, + active_interval: Duration, +} + +impl AdaptivePolling { + /// Create a new adaptive polling policy. + pub fn new(idle_interval: Duration, active_interval: Duration) -> Self { + Self { + idle_interval, + active_interval, + } + } + + /// Create a policy with sensible defaults for rig polling. + pub fn default_rig() -> Self { + Self { + idle_interval: Duration::from_millis(500), + active_interval: Duration::from_millis(100), + } + } +} + +impl Default for AdaptivePolling { + fn default() -> Self { + Self::default_rig() + } +} + +impl PollingPolicy for AdaptivePolling { + fn interval(&self, transmitting: bool) -> Duration { + if transmitting { + self.active_interval + } else { + self.idle_interval + } + } + + fn should_poll(&self, _transmitting: bool) -> bool { + true + } +} + +/// Fixed polling policy. +/// +/// Uses a constant interval regardless of rig state. +#[derive(Debug, Clone)] +pub struct FixedPolling { + interval: Duration, +} + +impl FixedPolling { + /// Create a new fixed polling policy. + pub fn new(interval: Duration) -> Self { + Self { interval } + } +} + +impl PollingPolicy for FixedPolling { + fn interval(&self, _transmitting: bool) -> Duration { + self.interval + } + + fn should_poll(&self, _transmitting: bool) -> bool { + true + } +} + +/// No polling policy - disables automatic polling. +#[derive(Debug, Clone, Copy, Default)] +pub struct NoPolling; + +impl PollingPolicy for NoPolling { + fn interval(&self, _transmitting: bool) -> Duration { + Duration::MAX + } + + fn should_poll(&self, _transmitting: bool) -> bool { + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_exponential_backoff_delays() { + let policy = ExponentialBackoff::new(5, Duration::from_millis(100), Duration::from_secs(1)); + + assert_eq!(policy.delay(0), Duration::from_millis(100)); + assert_eq!(policy.delay(1), Duration::from_millis(200)); + assert_eq!(policy.delay(2), Duration::from_millis(400)); + assert_eq!(policy.delay(3), Duration::from_millis(800)); + // Should cap at max_delay + assert_eq!(policy.delay(4), Duration::from_secs(1)); + assert_eq!(policy.delay(5), Duration::from_secs(1)); + } + + #[test] + fn test_exponential_backoff_should_retry() { + let policy = ExponentialBackoff::new(3, Duration::from_millis(100), Duration::from_secs(1)); + + let transient = RigError::timeout(); + let fatal = RigError::not_supported("test"); + + assert!(policy.should_retry(0, &transient)); + assert!(policy.should_retry(1, &transient)); + assert!(policy.should_retry(2, &transient)); + assert!(!policy.should_retry(3, &transient)); // exceeded max attempts + + assert!(!policy.should_retry(0, &fatal)); // not transient + } + + #[test] + fn test_fixed_delay() { + let policy = FixedDelay::new(3, Duration::from_millis(500)); + + assert_eq!(policy.delay(0), Duration::from_millis(500)); + assert_eq!(policy.delay(1), Duration::from_millis(500)); + assert_eq!(policy.delay(5), Duration::from_millis(500)); + } + + #[test] + fn test_adaptive_polling() { + let policy = AdaptivePolling::new(Duration::from_millis(500), Duration::from_millis(100)); + + assert_eq!(policy.interval(false), Duration::from_millis(500)); + assert_eq!(policy.interval(true), Duration::from_millis(100)); + assert!(policy.should_poll(false)); + assert!(policy.should_poll(true)); + } + + #[test] + fn test_no_polling() { + let policy = NoPolling; + + assert!(!policy.should_poll(false)); + assert!(!policy.should_poll(true)); + } +}