rig: integrate controller and rig task updates

This commit is contained in:
2026-01-18 09:20:10 +01:00
parent 1be08b245c
commit a941c77039
5 changed files with 1336 additions and 0 deletions
+730
View File
@@ -0,0 +1,730 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Rig task implementation using controller components.
use std::time::Duration;
use tokio::sync::{mpsc, watch};
use tokio::time::{self, Instant};
use tracing::{debug, error, info, warn};
use trx_backend::{build_rig, RigAccess};
use trx_core::radio::freq::Freq;
use trx_core::rig::command::RigCommand;
use trx_core::rig::controller::{
command_from_rig_command, AdaptivePolling, CommandContext, CommandResult, ExponentialBackoff,
PollingPolicy, ReadyStateData, RetryPolicy, RigCatExecutor, RigEventEmitter, RigMachineState,
RigStateMachine, TransmittingStateData, ValidationResult,
};
use trx_core::rig::request::RigRequest;
use trx_core::rig::state::{RigMode, RigSnapshot, RigState};
use trx_core::rig::{RigCat, RigControl, RigRxStatus, RigStatus, RigTxStatus};
use trx_core::{DynResult, RigError, RigResult};
use crate::error::is_invalid_bcd_error;
/// Configuration for the rig task.
pub struct RigTaskConfig {
pub rig_model: String,
pub access: RigAccess,
pub polling: AdaptivePolling,
pub retry: ExponentialBackoff,
pub initial_freq_hz: u64,
pub initial_mode: RigMode,
}
impl Default for RigTaskConfig {
fn default() -> Self {
Self {
rig_model: "ft817".to_string(),
access: RigAccess::Serial {
path: "/dev/ttyUSB0".to_string(),
baud: 9600,
},
polling: AdaptivePolling::default(),
retry: ExponentialBackoff::default(),
initial_freq_hz: 144_300_000,
initial_mode: RigMode::USB,
}
}
}
/// Command context implementation for validation.
struct TaskCommandContext<'a> {
machine: &'a RigStateMachine,
}
impl<'a> CommandContext for TaskCommandContext<'a> {
fn state(&self) -> &RigMachineState {
self.machine.state()
}
}
/// Run the rig task with the new controller-based implementation.
pub async fn run_rig_task(
config: RigTaskConfig,
mut rx: mpsc::Receiver<RigRequest>,
state_tx: watch::Sender<RigState>,
) -> DynResult<()> {
info!("Opening rig backend {}", config.rig_model);
match &config.access {
RigAccess::Serial { path, baud } => info!("Serial: {} @ {} baud", path, baud),
RigAccess::Tcp { addr } => info!("TCP CAT: {}", addr),
}
let mut rig: Box<dyn RigCat> = build_rig(&config.rig_model, config.access)?;
info!("Rig backend ready");
// Initialize state machine and state
let mut machine = RigStateMachine::new();
let emitter = RigEventEmitter::new();
let mut state = RigState {
rig_info: None,
status: RigStatus {
freq: Freq { hz: 144_300_000 },
mode: RigMode::USB,
tx_en: false,
vfo: None,
tx: Some(RigTxStatus {
power: None,
limit: None,
swr: None,
alc: None,
}),
rx: Some(RigRxStatus { sig: None }),
lock: Some(false),
},
initialized: false,
control: RigControl {
rpt_offset_hz: None,
ctcss_hz: None,
dcs_code: None,
lock: Some(false),
clar_hz: None,
clar_on: None,
enabled: Some(false),
},
};
// Polling configuration
let polling = &config.polling;
let retry = &config.retry;
let mut poll_pause_until: Option<Instant> = None;
let mut last_power_on: Option<Instant> = None;
let mut initial_status_read = false;
// Initial setup: get rig info
let rig_info = rig.info().clone();
state.rig_info = Some(rig_info);
if let Some(info) = state.rig_info.as_ref() {
info!(
"Rig info: {} {} {}",
info.manufacturer, info.model, info.revision
);
}
let old_machine_state = machine.state().clone();
sync_machine_state(&mut machine, &state);
let new_machine_state = machine.state().clone();
emit_state_changes(
&emitter,
&state,
&state,
&old_machine_state,
&new_machine_state,
);
let _ = state_tx.send(state.clone());
// Initial power-on sequence
if !state.control.enabled.unwrap_or(false) {
info!("Sending initial PowerOn to wake rig");
match rig.power_on().await {
Ok(()) => {
state.control.enabled = Some(true);
time::sleep(Duration::from_secs(3)).await;
if let Err(e) = refresh_state_with_retry(&mut rig, &mut state, retry).await {
warn!(
"Initial PowerOn refresh failed: {:?}; retrying once after short delay",
e
);
time::sleep(Duration::from_millis(500)).await;
if let Err(e2) = refresh_state_with_retry(&mut rig, &mut state, retry).await {
warn!(
"Initial PowerOn second refresh failed (continuing): {:?}",
e2
);
}
} else {
initial_status_read = true;
}
info!("Rig initialized after power on sequence");
}
Err(e) => warn!("Initial PowerOn failed (continuing): {:?}", e),
}
}
// Prime VFO state
if let Err(e) = prime_vfo_state(&mut rig, &mut state, retry).await {
warn!("VFO priming failed: {:?}", e);
} else {
initial_status_read = true;
}
if initial_status_read {
let old_state = state.clone();
if let Err(e) = apply_initial_tune(
&mut rig,
&mut state,
retry,
config.initial_freq_hz,
&config.initial_mode,
)
.await
{
warn!("Initial tune failed (continuing): {:?}", e);
} else {
let old_machine_state = machine.state().clone();
sync_machine_state(&mut machine, &state);
let new_machine_state = machine.state().clone();
emit_state_changes(
&emitter,
&old_state,
&state,
&old_machine_state,
&new_machine_state,
);
}
}
state.initialized = true;
let old_machine_state = machine.state().clone();
sync_machine_state(&mut machine, &state);
let new_machine_state = machine.state().clone();
emit_state_changes(
&emitter,
&state,
&state,
&old_machine_state,
&new_machine_state,
);
let _ = state_tx.send(state.clone());
// Main task loop
loop {
let poll_duration = polling.interval(state.status.tx_en);
let mut poll_interval = time::interval(poll_duration);
tokio::select! {
_ = poll_interval.tick() => {
// Check if polling is paused
if let Some(until) = poll_pause_until {
if Instant::now() < until {
continue;
} else {
poll_pause_until = None;
}
}
// Skip polling if rig is powered off
if matches!(state.control.enabled, Some(false)) {
continue;
}
// Poll rig state
let old_state = state.clone();
match refresh_state_with_retry(&mut rig, &mut state, retry).await {
Ok(()) => {
let old_machine_state = machine.state().clone();
sync_machine_state(&mut machine, &state);
let new_machine_state = machine.state().clone();
emit_state_changes(
&emitter,
&old_state,
&state,
&old_machine_state,
&new_machine_state,
);
let _ = state_tx.send(state.clone());
}
Err(e) => {
error!("CAT polling error: {:?}", e);
// Grace period after power on
if let Some(last_on) = last_power_on {
if Instant::now().duration_since(last_on) < Duration::from_secs(5) {
poll_pause_until = Some(Instant::now() + Duration::from_millis(800));
continue;
}
}
}
}
},
maybe_req = rx.recv() => {
let Some(first_req) = maybe_req else { break; };
// Batch up any pending requests
let mut batch = vec![first_req];
while let Ok(next) = rx.try_recv() {
batch.push(next);
}
// Process each request
while let Some(RigRequest { cmd, respond_to }) = batch.pop() {
let cmd_label = format!("{:?}", cmd);
let started = Instant::now();
let mut cmd_ctx = CommandExecContext {
rig: &mut rig,
state: &mut state,
machine: &mut machine,
emitter: &emitter,
poll_pause_until: &mut poll_pause_until,
last_power_on: &mut last_power_on,
state_tx: &state_tx,
retry,
};
let result = process_command(cmd, &mut cmd_ctx).await;
let _ = respond_to.send(result);
let elapsed = started.elapsed();
if elapsed > Duration::from_millis(500) {
warn!("Rig command {} took {:?}", cmd_label, elapsed);
} else {
debug!("Rig command {} completed in {:?}", cmd_label, elapsed);
}
}
},
}
}
info!("rig_task shutting down (channel closed)");
Ok(())
}
/// Process a single rig command using command handlers.
struct CommandExecContext<'a> {
rig: &'a mut Box<dyn RigCat>,
state: &'a mut RigState,
machine: &'a mut RigStateMachine,
emitter: &'a RigEventEmitter,
poll_pause_until: &'a mut Option<Instant>,
last_power_on: &'a mut Option<Instant>,
state_tx: &'a watch::Sender<RigState>,
retry: &'a ExponentialBackoff,
}
async fn process_command(
cmd: RigCommand,
ctx: &mut CommandExecContext<'_>,
) -> RigResult<RigSnapshot> {
sync_machine_state(ctx.machine, ctx.state);
// Check if rig is ready for commands
let not_ready =
!ctx.state.initialized && !matches!(cmd, RigCommand::PowerOn | RigCommand::GetSnapshot);
if not_ready {
return Err(RigError::invalid_state("rig not initialized yet"));
}
// Get command handler and validate
let handler = command_from_rig_command(cmd.clone());
let ctx_view = TaskCommandContext {
machine: ctx.machine,
};
match handler.can_execute(&ctx_view) {
ValidationResult::Ok => {}
ValidationResult::Locked => {
warn!("{} blocked: panel lock is active", handler.name());
return Err(RigError::invalid_state("panel is locked"));
}
ValidationResult::InvalidState(msg) => {
warn!("{} blocked: {}", handler.name(), msg);
return Err(RigError::invalid_state(msg));
}
ValidationResult::InvalidParams(msg) => {
warn!("{} invalid params: {}", handler.name(), msg);
return Err(RigError::invalid_state(msg));
}
}
// Execute command
let old_state = ctx.state.clone();
let mut executor = RigCatExecutor::new(ctx.rig.as_mut());
let result = handler.execute(&mut executor).await;
match result {
Ok(cmd_result) => {
// Apply state updates based on command result
match cmd_result {
CommandResult::FreqUpdated(freq) => {
ctx.state.apply_freq(freq);
*ctx.poll_pause_until = Some(Instant::now() + Duration::from_millis(200));
}
CommandResult::ModeUpdated(mode) => {
ctx.state.apply_mode(mode);
*ctx.poll_pause_until = Some(Instant::now() + Duration::from_millis(200));
}
CommandResult::PttUpdated(ptt) => {
ctx.state.apply_ptt(ptt);
}
CommandResult::PowerUpdated(on) => {
ctx.state.control.enabled = Some(on);
if on {
time::sleep(Duration::from_secs(3)).await;
let now = Instant::now();
*ctx.poll_pause_until = Some(now + Duration::from_secs(3));
*ctx.last_power_on = Some(now);
// Refresh state after power on
if let Err(e) =
refresh_state_with_retry(ctx.rig, ctx.state, ctx.retry).await
{
if is_invalid_bcd_error(e.as_ref()) {
warn!("Transient CAT decode after PowerOn (ignored): {:?}", e);
*ctx.poll_pause_until =
Some(Instant::now() + Duration::from_millis(1500));
} else {
error!("Failed to refresh after PowerOn: {:?}", e);
return Err(RigError::communication(format!("CAT error: {}", e)));
}
}
} else {
ctx.state.status.tx_en = false;
}
}
CommandResult::LockUpdated(locked) => {
ctx.state.control.lock = Some(locked);
ctx.state.status.lock = Some(locked);
}
CommandResult::TxLimitUpdated(limit) => {
ctx.state
.status
.tx
.get_or_insert(RigTxStatus {
power: None,
limit: None,
swr: None,
alc: None,
})
.limit = Some(limit);
}
CommandResult::RefreshRequired => {
// For commands like ToggleVfo, GetSnapshot
if matches!(cmd, RigCommand::ToggleVfo) {
time::sleep(Duration::from_millis(150)).await;
*ctx.poll_pause_until = Some(Instant::now() + Duration::from_millis(300));
}
if let Err(e) = refresh_state_with_retry(ctx.rig, ctx.state, ctx.retry).await {
error!("Failed to refresh state: {:?}", e);
return Err(RigError::communication(format!("CAT error: {}", e)));
}
}
CommandResult::Ok => {}
}
let old_machine_state = ctx.machine.state().clone();
sync_machine_state(ctx.machine, ctx.state);
let new_machine_state = ctx.machine.state().clone();
emit_state_changes(
ctx.emitter,
&old_state,
ctx.state,
&old_machine_state,
&new_machine_state,
);
let _ = ctx.state_tx.send(ctx.state.clone());
snapshot_from(ctx.state)
}
Err(e) => {
error!("Command {} failed: {:?}", handler.name(), e);
Err(RigError::communication(format!("CAT error: {}", e)))
}
}
}
/// Refresh state from CAT with retry logic using the retry policy.
async fn refresh_state_with_retry(
rig: &mut Box<dyn RigCat>,
state: &mut RigState,
retry: &ExponentialBackoff,
) -> DynResult<()> {
let mut last_err: Option<Box<dyn std::error::Error + Send + Sync>> = None;
let max = retry.max_attempts() as usize;
for attempt in 0..max {
match refresh_state_from_cat(rig, state).await {
Ok(()) => return Ok(()),
Err(e) => {
let rig_err = RigError::communication(e.to_string());
if retry.should_retry(attempt as u32, &rig_err) && attempt + 1 < max {
let delay = retry.delay(attempt as u32);
warn!(
"Retrying CAT state read (attempt {} of {}, delay {:?})",
attempt + 1,
max,
delay
);
time::sleep(delay).await;
last_err = Some(e);
continue;
} else {
return Err(e);
}
}
}
}
Err(last_err.unwrap_or_else(|| "Unknown CAT error".into()))
}
/// Read current state from the rig via CAT.
async fn refresh_state_from_cat(rig: &mut Box<dyn RigCat>, state: &mut RigState) -> DynResult<()> {
let (freq, mode, vfo) = rig.get_status().await?;
state.control.enabled = Some(true);
state.apply_freq(freq);
state.apply_mode(mode);
state.status.vfo = vfo;
if state.status.tx_en {
state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(0);
} else if let Ok(meter) = rig.get_signal_strength().await {
let sig = map_signal_strength(&state.status.mode, meter);
state.status.rx.get_or_insert(RigRxStatus { sig: None }).sig = Some(sig);
}
if let Ok(limit) = rig.get_tx_limit().await {
state
.status
.tx
.get_or_insert(RigTxStatus {
power: None,
limit: None,
swr: None,
alc: None,
})
.limit = Some(limit);
}
if state.status.tx_en {
if let Ok(power) = rig.get_tx_power().await {
state
.status
.tx
.get_or_insert(RigTxStatus {
power: None,
limit: None,
swr: None,
alc: None,
})
.power = Some(power);
}
}
state.status.lock = Some(state.control.lock.unwrap_or(false));
Ok(())
}
/// Apply initial mode/frequency after a successful CAT status read.
async fn apply_initial_tune(
rig: &mut Box<dyn RigCat>,
state: &mut RigState,
retry: &ExponentialBackoff,
initial_freq_hz: u64,
initial_mode: &RigMode,
) -> DynResult<()> {
let needs_freq = state.status.freq.hz != initial_freq_hz;
let needs_mode = &state.status.mode != initial_mode;
if !needs_freq && !needs_mode {
return Ok(());
}
if needs_mode {
rig.set_mode(initial_mode.clone()).await?;
}
if needs_freq {
rig.set_freq(Freq {
hz: initial_freq_hz,
})
.await?;
}
refresh_state_with_retry(rig, state, retry).await
}
/// Prime VFO state by toggling and reading both VFOs.
async fn prime_vfo_state(
rig: &mut Box<dyn RigCat>,
state: &mut RigState,
retry: &ExponentialBackoff,
) -> DynResult<()> {
// Ensure panel is unlocked
let _ = rig.unlock().await;
time::sleep(Duration::from_millis(100)).await;
refresh_state_with_retry(rig, state, retry).await?;
time::sleep(Duration::from_millis(150)).await;
rig.toggle_vfo().await?;
time::sleep(Duration::from_millis(150)).await;
refresh_state_with_retry(rig, state, retry).await?;
rig.toggle_vfo().await?;
time::sleep(Duration::from_millis(150)).await;
refresh_state_with_retry(rig, state, retry).await?;
Ok(())
}
/// Map raw signal strength to S-meter value based on mode.
fn map_signal_strength(mode: &RigMode, raw: u8) -> i32 {
// FT-817 returns 0-15 for signal strength
// Map to approximate dBm / S-units
match mode {
RigMode::FM | RigMode::WFM => -120 + (raw as i32 * 6),
_ => -127 + (raw as i32 * 6),
}
}
/// Create a snapshot from current state.
fn snapshot_from(state: &RigState) -> RigResult<RigSnapshot> {
state
.snapshot()
.ok_or_else(|| RigError::invalid_state("Rig info unavailable"))
}
fn sync_machine_state(machine: &mut RigStateMachine, state: &RigState) {
let desired = desired_machine_state(state);
match (machine.state().clone(), &desired) {
(RigMachineState::Ready(_), RigMachineState::Ready(new_data)) => {
machine.update_ready_data(|data| {
*data = new_data.clone();
});
}
(RigMachineState::Transmitting(_), RigMachineState::Transmitting(new_data)) => {
machine.update_transmitting_data(|data| {
*data = new_data.clone();
});
}
_ => {
machine.set_state(desired);
}
}
}
fn desired_machine_state(state: &RigState) -> RigMachineState {
let rig_info = state.rig_info.clone();
if !state.initialized {
return rig_info
.map(|info| RigMachineState::Initializing {
rig_info: Some(info),
})
.unwrap_or(RigMachineState::Disconnected);
}
let Some(info) = rig_info else {
return RigMachineState::Disconnected;
};
if matches!(state.control.enabled, Some(false)) {
return RigMachineState::PoweredOff { rig_info: info };
}
if state.status.tx_en {
RigMachineState::Transmitting(transmitting_data_from_state(state, info))
} else {
RigMachineState::Ready(ready_data_from_state(state, info))
}
}
fn ready_data_from_state(state: &RigState, rig_info: trx_core::rig::RigInfo) -> ReadyStateData {
ReadyStateData {
rig_info,
freq: state.status.freq,
mode: state.status.mode.clone(),
vfo: state.status.vfo.clone(),
rx: state.status.rx.clone(),
tx_limit: state.status.tx.as_ref().and_then(|tx| tx.limit),
locked: lock_state_from(state),
}
}
fn transmitting_data_from_state(
state: &RigState,
rig_info: trx_core::rig::RigInfo,
) -> TransmittingStateData {
TransmittingStateData {
rig_info,
freq: state.status.freq,
mode: state.status.mode.clone(),
vfo: state.status.vfo.clone(),
tx: state.status.tx.clone(),
locked: lock_state_from(state),
}
}
fn lock_state_from(state: &RigState) -> bool {
state.control.lock.or(state.status.lock).unwrap_or(false)
}
fn emit_state_changes(
emitter: &RigEventEmitter,
old_state: &RigState,
new_state: &RigState,
old_machine_state: &RigMachineState,
new_machine_state: &RigMachineState,
) {
if old_state.status.freq.hz != new_state.status.freq.hz {
emitter.notify_frequency_change(Some(old_state.status.freq), new_state.status.freq);
}
if old_state.status.mode != new_state.status.mode {
emitter.notify_mode_change(Some(&old_state.status.mode), &new_state.status.mode);
}
if old_state.status.tx_en != new_state.status.tx_en {
emitter.notify_ptt_change(new_state.status.tx_en);
}
if lock_state_from(old_state) != lock_state_from(new_state) {
emitter.notify_lock_change(lock_state_from(new_state));
}
if old_state.control.enabled.unwrap_or(false) != new_state.control.enabled.unwrap_or(false) {
emitter.notify_power_change(new_state.control.enabled.unwrap_or(false));
}
if meters_changed(old_state, new_state) {
emitter.notify_meter_update(new_state.status.rx.as_ref(), new_state.status.tx.as_ref());
}
if std::mem::discriminant(old_machine_state) != std::mem::discriminant(new_machine_state) {
emitter.notify_state_change(old_machine_state, new_machine_state);
}
}
fn meters_changed(old_state: &RigState, new_state: &RigState) -> bool {
let old_rx_sig = old_state.status.rx.as_ref().and_then(|rx| rx.sig);
let new_rx_sig = new_state.status.rx.as_ref().and_then(|rx| rx.sig);
if old_rx_sig != new_rx_sig {
return true;
}
let (old_tx_power, old_tx_limit, old_tx_swr, old_tx_alc) =
tx_meter_parts(old_state.status.tx.as_ref());
let (new_tx_power, new_tx_limit, new_tx_swr, new_tx_alc) =
tx_meter_parts(new_state.status.tx.as_ref());
old_tx_power != new_tx_power
|| old_tx_limit != new_tx_limit
|| old_tx_swr != new_tx_swr
|| old_tx_alc != new_tx_alc
}
fn tx_meter_parts(tx: Option<&RigTxStatus>) -> (Option<u8>, Option<u8>, Option<f32>, Option<u8>) {
tx.map(|tx| (tx.power, tx.limit, tx.swr, tx.alc))
.unwrap_or((None, None, None, None))
}
+207
View File
@@ -0,0 +1,207 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Rig event notification system.
//!
//! This module provides typed event notifications for rig state changes,
//! allowing frontends and other components to react to specific events.
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use crate::radio::freq::Freq;
use crate::rig::state::RigMode;
use crate::rig::{RigRxStatus, RigTxStatus};
use super::machine::RigMachineState;
/// Unique identifier for a registered listener.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ListenerId(u64);
impl ListenerId {
fn new() -> Self {
static COUNTER: AtomicU64 = AtomicU64::new(0);
Self(COUNTER.fetch_add(1, Ordering::Relaxed))
}
}
/// Trait for components that want to receive rig events.
///
/// Implementors receive typed notifications when rig state changes.
/// All methods have default no-op implementations, so listeners can
/// selectively override only the events they care about.
pub trait RigListener: Send + Sync {
/// Called when the operating frequency changes.
fn on_frequency_change(&self, _old: Option<Freq>, _new: Freq) {}
/// Called when the operating mode changes.
fn on_mode_change(&self, _old: Option<&RigMode>, _new: &RigMode) {}
/// Called when PTT state changes.
fn on_ptt_change(&self, _transmitting: bool) {}
/// Called when the rig state machine transitions.
fn on_state_change(&self, _old: &RigMachineState, _new: &RigMachineState) {}
/// Called when meter readings are updated.
fn on_meter_update(&self, _rx: Option<&RigRxStatus>, _tx: Option<&RigTxStatus>) {}
/// Called when the panel lock state changes.
fn on_lock_change(&self, _locked: bool) {}
/// Called when the rig powers on or off.
fn on_power_change(&self, _powered: bool) {}
}
/// Manages registered listeners and dispatches events.
pub struct RigEventEmitter {
listeners: Vec<(ListenerId, Arc<dyn RigListener>)>,
}
impl Default for RigEventEmitter {
fn default() -> Self {
Self::new()
}
}
impl RigEventEmitter {
/// Create a new event emitter with no listeners.
pub fn new() -> Self {
Self {
listeners: Vec::new(),
}
}
/// Register a listener to receive events.
/// Returns an ID that can be used to unregister the listener.
pub fn register(&mut self, listener: Arc<dyn RigListener>) -> ListenerId {
let id = ListenerId::new();
self.listeners.push((id, listener));
id
}
/// Unregister a listener by its ID.
pub fn unregister(&mut self, id: ListenerId) {
self.listeners.retain(|(lid, _)| *lid != id);
}
/// Get the number of registered listeners.
pub fn listener_count(&self) -> usize {
self.listeners.len()
}
/// Notify all listeners of a frequency change.
pub fn notify_frequency_change(&self, old: Option<Freq>, new: Freq) {
for (_, listener) in &self.listeners {
listener.on_frequency_change(old, new);
}
}
/// Notify all listeners of a mode change.
pub fn notify_mode_change(&self, old: Option<&RigMode>, new: &RigMode) {
for (_, listener) in &self.listeners {
listener.on_mode_change(old, new);
}
}
/// Notify all listeners of a PTT state change.
pub fn notify_ptt_change(&self, transmitting: bool) {
for (_, listener) in &self.listeners {
listener.on_ptt_change(transmitting);
}
}
/// Notify all listeners of a state machine transition.
pub fn notify_state_change(&self, old: &RigMachineState, new: &RigMachineState) {
for (_, listener) in &self.listeners {
listener.on_state_change(old, new);
}
}
/// Notify all listeners of updated meter readings.
pub fn notify_meter_update(&self, rx: Option<&RigRxStatus>, tx: Option<&RigTxStatus>) {
for (_, listener) in &self.listeners {
listener.on_meter_update(rx, tx);
}
}
/// Notify all listeners of a lock state change.
pub fn notify_lock_change(&self, locked: bool) {
for (_, listener) in &self.listeners {
listener.on_lock_change(locked);
}
}
/// Notify all listeners of a power state change.
pub fn notify_power_change(&self, powered: bool) {
for (_, listener) in &self.listeners {
listener.on_power_change(powered);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicBool;
struct TestListener {
freq_changed: AtomicBool,
ptt_changed: AtomicBool,
}
impl TestListener {
fn new() -> Self {
Self {
freq_changed: AtomicBool::new(false),
ptt_changed: AtomicBool::new(false),
}
}
}
impl RigListener for TestListener {
fn on_frequency_change(&self, _old: Option<Freq>, _new: Freq) {
self.freq_changed.store(true, Ordering::Relaxed);
}
fn on_ptt_change(&self, _transmitting: bool) {
self.ptt_changed.store(true, Ordering::Relaxed);
}
}
#[test]
fn test_register_and_notify() {
let mut emitter = RigEventEmitter::new();
let listener = Arc::new(TestListener::new());
let id = emitter.register(listener.clone());
assert_eq!(emitter.listener_count(), 1);
emitter.notify_frequency_change(None, Freq { hz: 14_200_000 });
assert!(listener.freq_changed.load(Ordering::Relaxed));
assert!(!listener.ptt_changed.load(Ordering::Relaxed));
emitter.notify_ptt_change(true);
assert!(listener.ptt_changed.load(Ordering::Relaxed));
emitter.unregister(id);
assert_eq!(emitter.listener_count(), 0);
}
#[test]
fn test_multiple_listeners() {
let mut emitter = RigEventEmitter::new();
let listener1 = Arc::new(TestListener::new());
let listener2 = Arc::new(TestListener::new());
emitter.register(listener1.clone());
emitter.register(listener2.clone());
emitter.notify_frequency_change(Some(Freq { hz: 7_000_000 }), Freq { hz: 14_200_000 });
assert!(listener1.freq_changed.load(Ordering::Relaxed));
assert!(listener2.freq_changed.load(Ordering::Relaxed));
}
}
@@ -0,0 +1,85 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Command executor implementation that bridges to RigCat.
use std::future::Future;
use std::pin::Pin;
use crate::radio::freq::Freq;
use crate::rig::state::RigMode;
use crate::rig::RigCat;
use crate::DynResult;
use super::handlers::CommandExecutor;
/// Executor that delegates to a RigCat implementation.
pub struct RigCatExecutor<'a> {
rig: &'a mut dyn RigCat,
}
impl<'a> RigCatExecutor<'a> {
pub fn new(rig: &'a mut dyn RigCat) -> Self {
Self { rig }
}
}
impl<'a> CommandExecutor for RigCatExecutor<'a> {
fn set_freq<'b>(
&'b mut self,
freq: Freq,
) -> Pin<Box<dyn Future<Output = DynResult<()>> + Send + 'b>> {
self.rig.set_freq(freq)
}
fn set_mode<'b>(
&'b mut self,
mode: RigMode,
) -> Pin<Box<dyn Future<Output = DynResult<()>> + Send + 'b>> {
self.rig.set_mode(mode)
}
fn set_ptt<'b>(
&'b mut self,
ptt: bool,
) -> Pin<Box<dyn Future<Output = DynResult<()>> + Send + 'b>> {
self.rig.set_ptt(ptt)
}
fn power_on<'b>(&'b mut self) -> Pin<Box<dyn Future<Output = DynResult<()>> + Send + 'b>> {
self.rig.power_on()
}
fn power_off<'b>(&'b mut self) -> Pin<Box<dyn Future<Output = DynResult<()>> + Send + 'b>> {
self.rig.power_off()
}
fn toggle_vfo<'b>(&'b mut self) -> Pin<Box<dyn Future<Output = DynResult<()>> + Send + 'b>> {
self.rig.toggle_vfo()
}
fn lock<'b>(&'b mut self) -> Pin<Box<dyn Future<Output = DynResult<()>> + Send + 'b>> {
self.rig.lock()
}
fn unlock<'b>(&'b mut self) -> Pin<Box<dyn Future<Output = DynResult<()>> + Send + 'b>> {
self.rig.unlock()
}
fn get_tx_limit<'b>(&'b mut self) -> Pin<Box<dyn Future<Output = DynResult<u8>> + Send + 'b>> {
self.rig.get_tx_limit()
}
fn set_tx_limit<'b>(
&'b mut self,
limit: u8,
) -> Pin<Box<dyn Future<Output = DynResult<()>> + Send + 'b>> {
self.rig.set_tx_limit(limit)
}
fn refresh_state<'b>(&'b mut self) -> Pin<Box<dyn Future<Output = DynResult<()>> + Send + 'b>> {
// This is a no-op for the executor - the controller handles state refresh
Box::pin(async { Ok(()) })
}
}
+26
View File
@@ -0,0 +1,26 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Rig controller components.
//!
//! This module contains the core control logic for managing rig state,
//! handling commands, emitting events, and configuring operational policies.
pub mod events;
pub mod executor;
pub mod handlers;
pub mod machine;
pub mod policies;
pub use events::{ListenerId, RigEventEmitter, RigListener};
pub use executor::RigCatExecutor;
pub use handlers::{
command_from_rig_command, CommandContext, CommandExecutor, CommandResult, RigCommandHandler,
ValidationResult,
};
pub use machine::{
ReadyStateData, RigEvent, RigMachineState, RigStateError, RigStateMachine,
TransmittingStateData,
};
pub use policies::{AdaptivePolling, ExponentialBackoff, PollingPolicy, RetryPolicy};
+288
View File
@@ -0,0 +1,288 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Rig operational policies for retry and polling behavior.
//!
//! This module provides configurable policies that control how the rig
//! controller handles retries after failures and polling intervals.
use std::time::Duration;
use crate::rig::response::RigError;
/// Policy for retrying failed operations.
pub trait RetryPolicy: Send + Sync {
/// Determine if the operation should be retried.
fn should_retry(&self, attempt: u32, error: &RigError) -> bool;
/// Get the delay before the next retry attempt.
fn delay(&self, attempt: u32) -> Duration;
/// Get the maximum number of attempts allowed.
fn max_attempts(&self) -> u32;
}
/// Exponential backoff retry policy.
///
/// Delays increase exponentially with each retry attempt,
/// up to a configured maximum delay.
#[derive(Debug, Clone)]
pub struct ExponentialBackoff {
max_attempts: u32,
base_delay: Duration,
max_delay: Duration,
}
impl ExponentialBackoff {
/// Create a new exponential backoff policy.
pub fn new(max_attempts: u32, base_delay: Duration, max_delay: Duration) -> Self {
Self {
max_attempts,
base_delay,
max_delay,
}
}
/// Create a policy with sensible defaults for rig communication.
pub fn default_rig() -> Self {
Self {
max_attempts: 3,
base_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(2),
}
}
}
impl Default for ExponentialBackoff {
fn default() -> Self {
Self::default_rig()
}
}
impl RetryPolicy for ExponentialBackoff {
fn should_retry(&self, attempt: u32, error: &RigError) -> bool {
if attempt >= self.max_attempts {
return false;
}
// Only retry transient errors
error.is_transient()
}
fn delay(&self, attempt: u32) -> Duration {
let multiplier = 2u32.saturating_pow(attempt);
let delay = self.base_delay.saturating_mul(multiplier);
delay.min(self.max_delay)
}
fn max_attempts(&self) -> u32 {
self.max_attempts
}
}
/// Fixed delay retry policy.
///
/// Uses a constant delay between retry attempts.
#[derive(Debug, Clone)]
pub struct FixedDelay {
max_attempts: u32,
delay: Duration,
}
impl FixedDelay {
/// Create a new fixed delay policy.
pub fn new(max_attempts: u32, delay: Duration) -> Self {
Self {
max_attempts,
delay,
}
}
}
impl RetryPolicy for FixedDelay {
fn should_retry(&self, attempt: u32, error: &RigError) -> bool {
attempt < self.max_attempts && error.is_transient()
}
fn delay(&self, _attempt: u32) -> Duration {
self.delay
}
fn max_attempts(&self) -> u32 {
self.max_attempts
}
}
/// No retry policy - operations fail immediately.
#[derive(Debug, Clone, Copy, Default)]
pub struct NoRetry;
impl RetryPolicy for NoRetry {
fn should_retry(&self, _attempt: u32, _error: &RigError) -> bool {
false
}
fn delay(&self, _attempt: u32) -> Duration {
Duration::ZERO
}
fn max_attempts(&self) -> u32 {
1
}
}
/// Policy for polling the rig for status updates.
pub trait PollingPolicy: Send + Sync {
/// Get the interval between polls.
fn interval(&self, transmitting: bool) -> Duration;
/// Determine if polling should occur given the current state.
fn should_poll(&self, transmitting: bool) -> bool;
}
/// Adaptive polling policy.
///
/// Uses different intervals depending on whether the rig is transmitting.
/// Polls more frequently during TX to track power/SWR meters.
#[derive(Debug, Clone)]
pub struct AdaptivePolling {
idle_interval: Duration,
active_interval: Duration,
}
impl AdaptivePolling {
/// Create a new adaptive polling policy.
pub fn new(idle_interval: Duration, active_interval: Duration) -> Self {
Self {
idle_interval,
active_interval,
}
}
/// Create a policy with sensible defaults for rig polling.
pub fn default_rig() -> Self {
Self {
idle_interval: Duration::from_millis(500),
active_interval: Duration::from_millis(100),
}
}
}
impl Default for AdaptivePolling {
fn default() -> Self {
Self::default_rig()
}
}
impl PollingPolicy for AdaptivePolling {
fn interval(&self, transmitting: bool) -> Duration {
if transmitting {
self.active_interval
} else {
self.idle_interval
}
}
fn should_poll(&self, _transmitting: bool) -> bool {
true
}
}
/// Fixed polling policy.
///
/// Uses a constant interval regardless of rig state.
#[derive(Debug, Clone)]
pub struct FixedPolling {
interval: Duration,
}
impl FixedPolling {
/// Create a new fixed polling policy.
pub fn new(interval: Duration) -> Self {
Self { interval }
}
}
impl PollingPolicy for FixedPolling {
fn interval(&self, _transmitting: bool) -> Duration {
self.interval
}
fn should_poll(&self, _transmitting: bool) -> bool {
true
}
}
/// No polling policy - disables automatic polling.
#[derive(Debug, Clone, Copy, Default)]
pub struct NoPolling;
impl PollingPolicy for NoPolling {
fn interval(&self, _transmitting: bool) -> Duration {
Duration::MAX
}
fn should_poll(&self, _transmitting: bool) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_exponential_backoff_delays() {
let policy = ExponentialBackoff::new(5, Duration::from_millis(100), Duration::from_secs(1));
assert_eq!(policy.delay(0), Duration::from_millis(100));
assert_eq!(policy.delay(1), Duration::from_millis(200));
assert_eq!(policy.delay(2), Duration::from_millis(400));
assert_eq!(policy.delay(3), Duration::from_millis(800));
// Should cap at max_delay
assert_eq!(policy.delay(4), Duration::from_secs(1));
assert_eq!(policy.delay(5), Duration::from_secs(1));
}
#[test]
fn test_exponential_backoff_should_retry() {
let policy = ExponentialBackoff::new(3, Duration::from_millis(100), Duration::from_secs(1));
let transient = RigError::timeout();
let fatal = RigError::not_supported("test");
assert!(policy.should_retry(0, &transient));
assert!(policy.should_retry(1, &transient));
assert!(policy.should_retry(2, &transient));
assert!(!policy.should_retry(3, &transient)); // exceeded max attempts
assert!(!policy.should_retry(0, &fatal)); // not transient
}
#[test]
fn test_fixed_delay() {
let policy = FixedDelay::new(3, Duration::from_millis(500));
assert_eq!(policy.delay(0), Duration::from_millis(500));
assert_eq!(policy.delay(1), Duration::from_millis(500));
assert_eq!(policy.delay(5), Duration::from_millis(500));
}
#[test]
fn test_adaptive_polling() {
let policy = AdaptivePolling::new(Duration::from_millis(500), Duration::from_millis(100));
assert_eq!(policy.interval(false), Duration::from_millis(500));
assert_eq!(policy.interval(true), Duration::from_millis(100));
assert!(policy.should_poll(false));
assert!(policy.should_poll(true));
}
#[test]
fn test_no_polling() {
let policy = NoPolling;
assert!(!policy.should_poll(false));
assert!(!policy.should_poll(true));
}
}