[refactor](trx-rs): resolve all improvement areas (P1–P3)

P1 — High:
- Merge duplicate APRS/HF-APRS decoder tasks into parameterised inner fn
- Merge duplicate FT8/FT4 decoder tasks into shared ftx inner fn
- Add multi-rig state isolation and command routing tests (listener.rs)
- Add background decode evaluate_bookmark unit tests

P2 — Medium:
- Fix decode-log silent flush errors and rotation failure fallback
- Split api.rs (2,831 LOC) into 7 logical modules (decoder, rig, vchan,
  sse, bookmarks, assets, mod)
- Extract background decode decision cascade into pure evaluate_bookmark()
  function with ChannelAction enum
- Relax actix-web pin from =4.4.1 to 4.4
- Replace VDES magic numbers with named constants

P3 — Low:
- Add doc comments to AisDecoder, VdesDecoder, RdsDecoder
- Add debug_assert on turbo decoder interleaver/deinterleaver lengths
- Add tracing info_span! to all 10 decoder block_in_place calls
- Optimize hot-path string cloning in remote_client spectrum loop

https://claude.ai/code/session_01Y3G65hrfsRRjwyBF2qbBmc
Signed-off-by: Claude <noreply@anthropic.com>
This commit is contained in:
Claude
2026-03-29 14:40:03 +00:00
committed by Stan Grams
parent 44e09449dc
commit c041ac83f3
21 changed files with 4566 additions and 3359 deletions
+239 -327
View File
@@ -17,7 +17,7 @@ use num_complex::Complex;
use std::io::Write as _;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc, watch};
use tracing::{error, info, warn};
use tracing::{error, info, info_span, warn};
use trx_ais::AisDecoder;
use trx_aprs::AprsDecoder;
@@ -1187,123 +1187,52 @@ fn run_playback(
pub async fn run_aprs_decoder(
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
mut state_rx: watch::Receiver<RigState>,
pcm_rx: broadcast::Receiver<Vec<f32>>,
state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
decode_logs: Option<Arc<DecoderLoggers>>,
histories: Arc<DecoderHistories>,
) {
info!("APRS decoder started ({}Hz, {} ch)", sample_rate, channels);
let mut decoder = AprsDecoder::new(sample_rate);
let mut was_active = false;
let mut last_reset_seq: u64 = 0;
let mut active = matches!(state_rx.borrow().status.mode, RigMode::PKT);
loop {
if !active {
match state_rx.changed().await {
Ok(()) => {
let state = state_rx.borrow();
active = matches!(state.status.mode, RigMode::PKT);
if active {
pcm_rx = pcm_rx.resubscribe();
}
if state.reset_seqs.aprs_decode_reset_seq != last_reset_seq {
last_reset_seq = state.reset_seqs.aprs_decode_reset_seq;
decoder.reset();
info!("APRS decoder reset (seq={})", last_reset_seq);
}
}
Err(_) => break,
}
continue;
}
tokio::select! {
recv = pcm_rx.recv() => {
match recv {
Ok(frame) => {
let reset_seq = {
let state = state_rx.borrow();
state.reset_seqs.aprs_decode_reset_seq
};
if reset_seq != last_reset_seq {
last_reset_seq = reset_seq;
decoder.reset();
info!("APRS decoder reset (seq={})", last_reset_seq);
pcm_rx = pcm_rx.resubscribe();
continue;
}
// Downmix to mono if stereo
let mut mono = if channels > 1 {
let num_frames = frame.len() / channels as usize;
let mut mono = Vec::with_capacity(num_frames);
for i in 0..num_frames {
mono.push(frame[i * channels as usize]);
}
mono
} else {
frame
};
apply_decode_audio_gate(&mut mono);
was_active = true;
let packets = tokio::task::block_in_place(|| decoder.process_samples(&mono));
let latest_reset_seq = state_rx.borrow().reset_seqs.aprs_decode_reset_seq;
if latest_reset_seq != reset_seq {
last_reset_seq = latest_reset_seq;
decoder.reset();
info!("APRS decoder reset (seq={})", last_reset_seq);
pcm_rx = pcm_rx.resubscribe();
continue;
}
for mut pkt in packets {
if let Some(logger) = decode_logs.as_ref() {
logger.log_aprs(&pkt);
}
if !pkt.crc_ok {
continue;
}
if pkt.ts_ms.is_none() {
pkt.ts_ms = Some(current_timestamp_ms());
}
histories.record_aprs_packet(pkt.clone());
let _ = decode_tx.send(DecodedMessage::Aprs(pkt));
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("APRS decoder: dropped {} PCM frames", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
changed = state_rx.changed() => {
match changed {
Ok(()) => {
let state = state_rx.borrow();
active = matches!(state.status.mode, RigMode::PKT);
if state.reset_seqs.aprs_decode_reset_seq != last_reset_seq {
last_reset_seq = state.reset_seqs.aprs_decode_reset_seq;
decoder.reset();
info!("APRS decoder reset (seq={})", last_reset_seq);
}
if !active && was_active {
decoder.reset();
was_active = false;
}
if active {
pcm_rx = pcm_rx.resubscribe();
}
}
Err(_) => break,
}
}
}
}
run_aprs_decoder_inner(
"APRS",
sample_rate,
channels,
pcm_rx,
state_rx,
decode_tx,
decode_logs,
histories,
false,
)
.await;
}
pub async fn run_hf_aprs_decoder(
sample_rate: u32,
channels: u16,
pcm_rx: broadcast::Receiver<Vec<f32>>,
state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
decode_logs: Option<Arc<DecoderLoggers>>,
histories: Arc<DecoderHistories>,
) {
run_aprs_decoder_inner(
"HF APRS",
sample_rate,
channels,
pcm_rx,
state_rx,
decode_tx,
decode_logs,
histories,
true,
)
.await;
}
#[allow(clippy::too_many_arguments)]
async fn run_aprs_decoder_inner(
label: &str,
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
@@ -1311,29 +1240,50 @@ pub async fn run_hf_aprs_decoder(
decode_tx: broadcast::Sender<DecodedMessage>,
decode_logs: Option<Arc<DecoderLoggers>>,
histories: Arc<DecoderHistories>,
is_hf: bool,
) {
info!(
"HF APRS decoder started ({}Hz, {} ch)",
sample_rate, channels
);
let mut decoder = AprsDecoder::new_hf(sample_rate);
info!("{} decoder started ({}Hz, {} ch)", label, sample_rate, channels);
let mut decoder = if is_hf {
AprsDecoder::new_hf(sample_rate)
} else {
AprsDecoder::new(sample_rate)
};
let mut was_active = false;
let mut last_reset_seq: u64 = 0;
let mut active = matches!(state_rx.borrow().status.mode, RigMode::DIG);
let mode_match = |state: &RigState| -> bool {
if is_hf {
matches!(state.status.mode, RigMode::DIG)
} else {
matches!(state.status.mode, RigMode::PKT)
}
};
let get_reset_seq = |state: &RigState| -> u64 {
if is_hf {
state.reset_seqs.hf_aprs_decode_reset_seq
} else {
state.reset_seqs.aprs_decode_reset_seq
}
};
let span_name = if is_hf { "hf_aprs_decode" } else { "aprs_decode" };
let mut active = mode_match(&state_rx.borrow());
loop {
if !active {
match state_rx.changed().await {
Ok(()) => {
let state = state_rx.borrow();
active = matches!(state.status.mode, RigMode::DIG);
active = mode_match(&state);
if active {
pcm_rx = pcm_rx.resubscribe();
}
if state.reset_seqs.hf_aprs_decode_reset_seq != last_reset_seq {
last_reset_seq = state.reset_seqs.hf_aprs_decode_reset_seq;
let seq = get_reset_seq(&state);
if seq != last_reset_seq {
last_reset_seq = seq;
decoder.reset();
info!("HF APRS decoder reset (seq={})", last_reset_seq);
info!("{} decoder reset (seq={})", label, last_reset_seq);
}
}
Err(_) => break,
@@ -1345,14 +1295,11 @@ pub async fn run_hf_aprs_decoder(
recv = pcm_rx.recv() => {
match recv {
Ok(frame) => {
let reset_seq = {
let state = state_rx.borrow();
state.reset_seqs.hf_aprs_decode_reset_seq
};
let reset_seq = get_reset_seq(&state_rx.borrow());
if reset_seq != last_reset_seq {
last_reset_seq = reset_seq;
decoder.reset();
info!("HF APRS decoder reset (seq={})", last_reset_seq);
info!("{} decoder reset (seq={})", label, last_reset_seq);
pcm_rx = pcm_rx.resubscribe();
continue;
}
@@ -1361,12 +1308,15 @@ pub async fn run_hf_aprs_decoder(
apply_decode_audio_gate(&mut mono);
was_active = true;
let packets = tokio::task::block_in_place(|| decoder.process_samples(&mono));
let latest_reset_seq = state_rx.borrow().reset_seqs.hf_aprs_decode_reset_seq;
let packets = tokio::task::block_in_place(|| {
let _span = info_span!(target: "trx_server::audio", "aprs_decode_inner", variant = span_name).entered();
decoder.process_samples(&mono)
});
let latest_reset_seq = get_reset_seq(&state_rx.borrow());
if latest_reset_seq != reset_seq {
last_reset_seq = latest_reset_seq;
decoder.reset();
info!("HF APRS decoder reset (seq={})", last_reset_seq);
info!("{} decoder reset (seq={})", label, last_reset_seq);
pcm_rx = pcm_rx.resubscribe();
continue;
}
@@ -1380,12 +1330,17 @@ pub async fn run_hf_aprs_decoder(
if pkt.ts_ms.is_none() {
pkt.ts_ms = Some(current_timestamp_ms());
}
histories.record_hf_aprs_packet(pkt.clone());
let _ = decode_tx.send(DecodedMessage::HfAprs(pkt));
if is_hf {
histories.record_hf_aprs_packet(pkt.clone());
let _ = decode_tx.send(DecodedMessage::HfAprs(pkt));
} else {
histories.record_aprs_packet(pkt.clone());
let _ = decode_tx.send(DecodedMessage::Aprs(pkt));
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("HF APRS decoder: dropped {} PCM frames", n);
warn!("{} decoder: dropped {} PCM frames", label, n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
@@ -1394,11 +1349,12 @@ pub async fn run_hf_aprs_decoder(
match changed {
Ok(()) => {
let state = state_rx.borrow();
active = matches!(state.status.mode, RigMode::DIG);
if state.reset_seqs.hf_aprs_decode_reset_seq != last_reset_seq {
last_reset_seq = state.reset_seqs.hf_aprs_decode_reset_seq;
active = mode_match(&state);
let seq = get_reset_seq(&state);
if seq != last_reset_seq {
last_reset_seq = seq;
decoder.reset();
info!("HF APRS decoder reset (seq={})", last_reset_seq);
info!("{} decoder reset (seq={})", label, last_reset_seq);
}
if !active && was_active {
decoder.reset();
@@ -1467,7 +1423,10 @@ pub async fn run_ais_decoder(
was_active = true;
let mono = downmix_if_needed(frame, channels);
let messages =
tokio::task::block_in_place(|| decoder_a.process_samples(&mono, "A"));
tokio::task::block_in_place(|| {
let _span = info_span!("ais_decode_a").entered();
decoder_a.process_samples(&mono, "A")
});
for mut msg in messages {
if msg.ts_ms.is_none() {
msg.ts_ms = Some(current_timestamp_ms());
@@ -1488,7 +1447,10 @@ pub async fn run_ais_decoder(
was_active = true;
let mono = downmix_if_needed(frame, channels);
let messages =
tokio::task::block_in_place(|| decoder_b.process_samples(&mono, "B"));
tokio::task::block_in_place(|| {
let _span = info_span!("ais_decode_b").entered();
decoder_b.process_samples(&mono, "B")
});
for mut msg in messages {
if msg.ts_ms.is_none() {
msg.ts_ms = Some(current_timestamp_ms());
@@ -1559,7 +1521,10 @@ pub async fn run_vdes_decoder(
Ok(block) => {
was_active = true;
let messages =
tokio::task::block_in_place(|| decoder.process_samples(&block, "Main"));
tokio::task::block_in_place(|| {
let _span = info_span!("vdes_decode").entered();
decoder.process_samples(&block, "Main")
});
for mut msg in messages {
if msg.ts_ms.is_none() {
msg.ts_ms = Some(current_timestamp_ms());
@@ -1705,7 +1670,10 @@ pub async fn run_cw_decoder(
frame
};
was_active = true;
let events = tokio::task::block_in_place(|| decoder.process_samples(&mono));
let events = tokio::task::block_in_place(|| {
let _span = info_span!("cw_decode").entered();
decoder.process_samples(&mono)
});
let latest_reset_seq = state_rx.borrow().reset_seqs.cw_decode_reset_seq;
if latest_reset_seq != reset_seq {
last_reset_seq = latest_reset_seq;
@@ -1825,180 +1793,99 @@ fn resample_to_12k(samples: &[f32], sample_rate: u32) -> Option<Vec<f32>> {
pub async fn run_ft8_decoder(
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
mut state_rx: watch::Receiver<RigState>,
pcm_rx: broadcast::Receiver<Vec<f32>>,
state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
decode_logs: Option<Arc<DecoderLoggers>>,
histories: Arc<DecoderHistories>,
) {
info!("FT8 decoder started ({}Hz, {} ch)", sample_rate, channels);
let mut decoder = match Ft8Decoder::new(FT8_SAMPLE_RATE) {
Ok(decoder) => decoder,
Err(err) => {
warn!("FT8 decoder init failed: {}", err);
return;
}
};
let mut last_reset_seq: u64 = 0;
let mut active = state_rx.borrow().decoders.ft8_decode_enabled
&& matches!(state_rx.borrow().status.mode, RigMode::DIG | RigMode::USB);
let mut ft8_buf: Vec<f32> = Vec::new();
let mut last_slot: i64 = -1;
let slot_len_s: i64 = 15;
loop {
if !active {
match state_rx.changed().await {
Ok(()) => {
let state = state_rx.borrow();
active = state.decoders.ft8_decode_enabled
&& matches!(state.status.mode, RigMode::DIG | RigMode::USB);
if active {
pcm_rx = pcm_rx.resubscribe();
}
if state.reset_seqs.ft8_decode_reset_seq != last_reset_seq {
last_reset_seq = state.reset_seqs.ft8_decode_reset_seq;
decoder.reset();
ft8_buf.clear();
}
last_slot = -1;
}
Err(_) => break,
}
continue;
}
tokio::select! {
recv = pcm_rx.recv() => {
match recv {
Ok(frame) => {
let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(dur) => dur.as_secs() as i64,
Err(_) => 0,
};
let slot = now / slot_len_s;
if slot != last_slot {
last_slot = slot;
decoder.reset();
ft8_buf.clear();
}
let reset_seq = {
let state = state_rx.borrow();
state.reset_seqs.ft8_decode_reset_seq
};
if reset_seq != last_reset_seq {
last_reset_seq = reset_seq;
decoder.reset();
ft8_buf.clear();
pcm_rx = pcm_rx.resubscribe();
continue;
}
let mut mono = downmix_mono(frame, channels);
apply_decode_audio_gate(&mut mono);
let Some(resampled) = resample_to_12k(&mono, sample_rate) else {
warn!("FT8 decoder: unsupported sample rate {}", sample_rate);
break;
};
ft8_buf.extend_from_slice(&resampled);
while ft8_buf.len() >= decoder.block_size() {
let block: Vec<f32> = ft8_buf.drain(..decoder.block_size()).collect();
let results = tokio::task::block_in_place(|| {
decoder.process_block(&block);
decoder.decode_if_ready(100)
});
let latest_reset_seq = state_rx.borrow().reset_seqs.ft8_decode_reset_seq;
if latest_reset_seq != reset_seq {
last_reset_seq = latest_reset_seq;
decoder.reset();
ft8_buf.clear();
pcm_rx = pcm_rx.resubscribe();
continue;
}
if !results.is_empty() {
for res in results {
let ts_ms = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(dur) => dur.as_millis() as i64,
Err(_) => 0,
};
let base_freq_hz = state_rx.borrow().status.freq.hz as f64;
let abs_freq_hz = base_freq_hz + res.freq_hz as f64;
let msg = Ft8Message {
rig_id: None,
ts_ms,
snr_db: res.snr_db,
dt_s: res.dt_s,
freq_hz: if abs_freq_hz.is_finite() && abs_freq_hz > 0.0 {
abs_freq_hz as f32
} else {
res.freq_hz
},
message: res.text,
};
histories.record_ft8_message(msg.clone());
if let Some(logger) = decode_logs.as_ref() {
logger.log_ft8(&msg);
}
let _ = decode_tx.send(DecodedMessage::Ft8(msg));
}
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("FT8 decoder: dropped {} PCM frames", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
changed = state_rx.changed() => {
match changed {
Ok(()) => {
let state = state_rx.borrow();
active = state.decoders.ft8_decode_enabled
&& matches!(state.status.mode, RigMode::DIG | RigMode::USB);
if state.reset_seqs.ft8_decode_reset_seq != last_reset_seq {
last_reset_seq = state.reset_seqs.ft8_decode_reset_seq;
decoder.reset();
ft8_buf.clear();
}
if !active {
decoder.reset();
ft8_buf.clear();
last_slot = -1;
} else {
pcm_rx = pcm_rx.resubscribe();
}
}
Err(_) => break,
}
}
}
}
run_ftx_decoder_inner(
"FT8",
sample_rate,
channels,
pcm_rx,
state_rx,
decode_tx,
decode_logs,
histories,
false,
)
.await;
}
/// Run the FT4 decoder task. Mirrors FT8 but uses FT4 protocol (7.5s slots).
pub async fn run_ft4_decoder(
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
mut state_rx: watch::Receiver<RigState>,
pcm_rx: broadcast::Receiver<Vec<f32>>,
state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
histories: Arc<DecoderHistories>,
) {
info!("FT4 decoder started ({}Hz, {} ch)", sample_rate, channels);
let mut decoder = match Ft8Decoder::new_ft4(FT8_SAMPLE_RATE) {
Ok(decoder) => decoder,
Err(err) => {
warn!("FT4 decoder init failed: {}", err);
return;
run_ftx_decoder_inner(
"FT4",
sample_rate,
channels,
pcm_rx,
state_rx,
decode_tx,
None,
histories,
true,
)
.await;
}
/// Shared implementation for FT8 and FT4 decoder tasks.
#[allow(clippy::too_many_arguments)]
async fn run_ftx_decoder_inner(
label: &str,
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
mut state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
decode_logs: Option<Arc<DecoderLoggers>>,
histories: Arc<DecoderHistories>,
is_ft4: bool,
) {
info!("{} decoder started ({}Hz, {} ch)", label, sample_rate, channels);
let mut decoder = {
let result = if is_ft4 {
Ft8Decoder::new_ft4(FT8_SAMPLE_RATE)
} else {
Ft8Decoder::new(FT8_SAMPLE_RATE)
};
match result {
Ok(decoder) => decoder,
Err(err) => {
warn!("{} decoder init failed: {}", label, err);
return;
}
}
};
let is_enabled = |state: &RigState| -> bool {
if is_ft4 {
state.decoders.ft4_decode_enabled
} else {
state.decoders.ft8_decode_enabled
}
};
let get_reset_seq = |state: &RigState| -> u64 {
if is_ft4 {
state.reset_seqs.ft4_decode_reset_seq
} else {
state.reset_seqs.ft8_decode_reset_seq
}
};
let span_name = if is_ft4 { "ft4_decode" } else { "ft8_decode" };
let mut last_reset_seq: u64 = 0;
let mut active = state_rx.borrow().decoders.ft4_decode_enabled
let mut active = is_enabled(&state_rx.borrow())
&& matches!(state_rx.borrow().status.mode, RigMode::DIG | RigMode::USB);
let mut ft4_buf: Vec<f32> = Vec::new();
let mut sample_buf: Vec<f32> = Vec::new();
let mut last_slot: i64 = -1;
loop {
@@ -2006,15 +1893,16 @@ pub async fn run_ft4_decoder(
match state_rx.changed().await {
Ok(()) => {
let state = state_rx.borrow();
active = state.decoders.ft4_decode_enabled
active = is_enabled(&state)
&& matches!(state.status.mode, RigMode::DIG | RigMode::USB);
if active {
pcm_rx = pcm_rx.resubscribe();
}
if state.reset_seqs.ft4_decode_reset_seq != last_reset_seq {
last_reset_seq = state.reset_seqs.ft4_decode_reset_seq;
let seq = get_reset_seq(&state);
if seq != last_reset_seq {
last_reset_seq = seq;
decoder.reset();
ft4_buf.clear();
sample_buf.clear();
}
last_slot = -1;
}
@@ -2027,26 +1915,31 @@ pub async fn run_ft4_decoder(
recv = pcm_rx.recv() => {
match recv {
Ok(frame) => {
let now_ms = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(dur) => dur.as_millis() as i64,
Err(_) => 0,
// Compute current slot; FT8 uses 15s slots, FT4 uses 7.5s slots
let slot = if is_ft4 {
let now_ms = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(dur) => dur.as_millis() as i64,
Err(_) => 0,
};
now_ms / 7_500
} else {
let now_s = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(dur) => dur.as_secs() as i64,
Err(_) => 0,
};
now_s / 15
};
// FT4 slot period is 7.5s
let slot = now_ms / 7_500;
if slot != last_slot {
last_slot = slot;
decoder.reset();
ft4_buf.clear();
sample_buf.clear();
}
let reset_seq = {
let state = state_rx.borrow();
state.reset_seqs.ft4_decode_reset_seq
};
let reset_seq = get_reset_seq(&state_rx.borrow());
if reset_seq != last_reset_seq {
last_reset_seq = reset_seq;
decoder.reset();
ft4_buf.clear();
sample_buf.clear();
pcm_rx = pcm_rx.resubscribe();
continue;
}
@@ -2054,22 +1947,23 @@ pub async fn run_ft4_decoder(
let mut mono = downmix_mono(frame, channels);
apply_decode_audio_gate(&mut mono);
let Some(resampled) = resample_to_12k(&mono, sample_rate) else {
warn!("FT4 decoder: unsupported sample rate {}", sample_rate);
warn!("{} decoder: unsupported sample rate {}", label, sample_rate);
break;
};
ft4_buf.extend_from_slice(&resampled);
sample_buf.extend_from_slice(&resampled);
while ft4_buf.len() >= decoder.block_size() {
let block: Vec<f32> = ft4_buf.drain(..decoder.block_size()).collect();
while sample_buf.len() >= decoder.block_size() {
let block: Vec<f32> = sample_buf.drain(..decoder.block_size()).collect();
let results = tokio::task::block_in_place(|| {
let _span = info_span!(target: "trx_server::audio", "ftx_decode_inner", variant = span_name).entered();
decoder.process_block(&block);
decoder.decode_if_ready(100)
});
let latest_reset_seq = state_rx.borrow().reset_seqs.ft4_decode_reset_seq;
let latest_reset_seq = get_reset_seq(&state_rx.borrow());
if latest_reset_seq != reset_seq {
last_reset_seq = latest_reset_seq;
decoder.reset();
ft4_buf.clear();
sample_buf.clear();
pcm_rx = pcm_rx.resubscribe();
continue;
}
@@ -2093,14 +1987,22 @@ pub async fn run_ft4_decoder(
},
message: res.text,
};
histories.record_ft4_message(msg.clone());
let _ = decode_tx.send(DecodedMessage::Ft4(msg));
if is_ft4 {
histories.record_ft4_message(msg.clone());
let _ = decode_tx.send(DecodedMessage::Ft4(msg));
} else {
histories.record_ft8_message(msg.clone());
if let Some(logger) = decode_logs.as_ref() {
logger.log_ft8(&msg);
}
let _ = decode_tx.send(DecodedMessage::Ft8(msg));
}
}
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("FT4 decoder: dropped {} PCM frames", n);
warn!("{} decoder: dropped {} PCM frames", label, n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
@@ -2109,16 +2011,17 @@ pub async fn run_ft4_decoder(
match changed {
Ok(()) => {
let state = state_rx.borrow();
active = state.decoders.ft4_decode_enabled
active = is_enabled(&state)
&& matches!(state.status.mode, RigMode::DIG | RigMode::USB);
if state.reset_seqs.ft4_decode_reset_seq != last_reset_seq {
last_reset_seq = state.reset_seqs.ft4_decode_reset_seq;
let seq = get_reset_seq(&state);
if seq != last_reset_seq {
last_reset_seq = seq;
decoder.reset();
ft4_buf.clear();
sample_buf.clear();
}
if !active {
decoder.reset();
ft4_buf.clear();
sample_buf.clear();
last_slot = -1;
} else {
pcm_rx = pcm_rx.resubscribe();
@@ -2211,6 +2114,7 @@ pub async fn run_ft2_decoder(
{
pending_decode_samples -= FT2_ASYNC_TRIGGER_SAMPLES;
let results = tokio::task::block_in_place(|| {
let _span = info_span!("ft2_decode").entered();
decode_ft2_window(&mut decoder, &ft2_buf, 100)
});
let latest_reset_seq = state_rx.borrow().reset_seqs.ft2_decode_reset_seq;
@@ -2365,6 +2269,7 @@ pub async fn run_wspr_decoder(
} else if slot != last_slot {
let base_freq = state_rx.borrow().status.freq.hz;
let decode_results = tokio::task::block_in_place(|| {
let _span = info_span!("wspr_decode").entered();
decoder.decode_slot(&slot_buf, Some(base_freq))
});
let latest_reset_seq = state_rx.borrow().reset_seqs.wspr_decode_reset_seq;
@@ -2522,7 +2427,10 @@ pub async fn run_lrpt_decoder(
} else {
frame
};
let new_mcus = decoder.process_samples(&mono);
let new_mcus = {
let _span = info_span!("lrpt_decode").entered();
decoder.process_samples(&mono)
};
if new_mcus > 0 {
last_mcu_at = tokio::time::Instant::now();
}
@@ -2861,6 +2769,7 @@ async fn run_background_ft8_decoder(
while ft8_buf.len() >= decoder.block_size() {
let block: Vec<f32> = ft8_buf.drain(..decoder.block_size()).collect();
let results = tokio::task::block_in_place(|| {
let _span = info_span!("ft8_decode").entered();
decoder.process_block(&block);
decoder.decode_if_ready(100)
});
@@ -2941,6 +2850,7 @@ async fn run_background_ft4_decoder(
while ft4_buf.len() >= decoder.block_size() {
let block: Vec<f32> = ft4_buf.drain(..decoder.block_size()).collect();
let results = tokio::task::block_in_place(|| {
let _span = info_span!("ft4_decode").entered();
decoder.process_block(&block);
decoder.decode_if_ready(100)
});
@@ -3013,6 +2923,7 @@ async fn run_background_ft2_decoder(
{
pending_decode_samples -= FT2_ASYNC_TRIGGER_SAMPLES;
let results = tokio::task::block_in_place(|| {
let _span = info_span!("ft2_decode").entered();
decode_ft2_window(&mut decoder, &ft2_buf, 100)
});
for res in results {
@@ -3078,6 +2989,7 @@ async fn run_background_wspr_decoder(
last_slot = slot;
} else if slot != last_slot {
match tokio::task::block_in_place(|| {
let _span = info_span!("wspr_decode").entered();
decoder.decode_slot(&slot_buf, Some(base_freq_hz))
}) {
Ok(results) => {
+403
View File
@@ -807,4 +807,407 @@ mod tests {
handle.abort();
let _ = handle.await;
}
// ========================================================================
// Multi-rig integration tests
// ========================================================================
/// Create a sample state with custom model name, frequency, and mode.
fn sample_state_custom(model: &str, freq_hz: u64, mode: trx_core::RigMode) -> RigState {
let mut state = RigState::new_uninitialized();
state.initialized = true;
state.status.freq = trx_core::radio::freq::Freq { hz: freq_hz };
state.status.mode = mode;
state.rig_info = Some(RigInfo {
manufacturer: "Test".to_string(),
model: model.to_string(),
revision: "1".to_string(),
capabilities: RigCapabilities {
min_freq_step_hz: 1,
supported_bands: vec![Band {
low_hz: 1_800_000,
high_hz: 440_000_000,
tx_allowed: true,
}],
supported_modes: vec![
trx_core::RigMode::USB,
trx_core::RigMode::LSB,
trx_core::RigMode::FM,
],
num_vfos: 2,
lock: false,
lockable: true,
attenuator: false,
preamp: false,
rit: false,
rpt: false,
split: false,
tx: true,
tx_limit: true,
vfo_switch: true,
filter_controls: false,
signal_meter: true,
},
access: RigAccessMethod::Tcp {
addr: "127.0.0.1:0".to_string(),
},
});
state
}
/// Build a multi-rig HashMap with two rigs having independent state and
/// command channels. Returns the map, default rig id, and the mpsc
/// receivers for each rig so tests can inspect routed commands.
fn make_two_rigs(
state_a: RigState,
state_b: RigState,
) -> (
Arc<HashMap<String, RigHandle>>,
String,
mpsc::Receiver<RigRequest>,
mpsc::Receiver<RigRequest>,
) {
let (tx_a, rx_a) = mpsc::channel::<RigRequest>(8);
let (_state_tx_a, state_rx_a) = watch::channel(state_a);
let handle_a = RigHandle {
rig_id: "rig_hf".to_string(),
display_name: "HF Rig".to_string(),
rig_tx: tx_a,
state_rx: state_rx_a,
audio_port: 4531,
};
let (tx_b, rx_b) = mpsc::channel::<RigRequest>(8);
let (_state_tx_b, state_rx_b) = watch::channel(state_b);
let handle_b = RigHandle {
rig_id: "rig_vhf".to_string(),
display_name: "VHF Rig".to_string(),
rig_tx: tx_b,
state_rx: state_rx_b,
audio_port: 4532,
};
let mut map = HashMap::new();
map.insert("rig_hf".to_string(), handle_a);
map.insert("rig_vhf".to_string(), handle_b);
(Arc::new(map), "rig_hf".to_string(), rx_a, rx_b)
}
/// Helper: send a JSON line and read one response line from the stream.
async fn send_and_recv(
writer: &mut tokio::net::tcp::OwnedWriteHalf,
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
json: &[u8],
) -> ClientResponse {
writer.write_all(json).await.expect("write");
writer.write_all(b"\n").await.expect("newline");
writer.flush().await.expect("flush");
let mut line = String::new();
reader.read_line(&mut line).await.expect("read");
serde_json::from_str(line.trim_end()).expect("response json")
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn multi_rig_state_isolation() {
// Two rigs with different frequencies and modes.
let state_hf =
sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB);
let state_vhf =
sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM);
let (rigs, default_id, _rx_a, _rx_b) = make_two_rigs(state_hf, state_vhf);
let addr = loopback_addr();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
// Allow listener to bind.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = TcpStream::connect(addr).await.expect("connect");
let (read_half, mut writer) = stream.into_split();
let mut reader = BufReader::new(read_half);
// Query rig_hf — should return HF state.
let resp = send_and_recv(
&mut writer,
&mut reader,
br#"{"rig_id":"rig_hf","cmd":"get_state"}"#,
)
.await;
assert!(resp.success, "rig_hf get_state should succeed");
assert_eq!(resp.rig_id.as_deref(), Some("rig_hf"));
let snap_hf = resp.state.expect("rig_hf snapshot");
assert_eq!(snap_hf.info.model, "HF-Dummy");
assert_eq!(snap_hf.status.freq.hz, 14_200_000);
// Query rig_vhf — should return VHF state.
let resp = send_and_recv(
&mut writer,
&mut reader,
br#"{"rig_id":"rig_vhf","cmd":"get_state"}"#,
)
.await;
assert!(resp.success, "rig_vhf get_state should succeed");
assert_eq!(resp.rig_id.as_deref(), Some("rig_vhf"));
let snap_vhf = resp.state.expect("rig_vhf snapshot");
assert_eq!(snap_vhf.info.model, "VHF-Dummy");
assert_eq!(snap_vhf.status.freq.hz, 145_500_000);
// Verify the two snapshots have different modes.
assert_ne!(
snap_hf.status.mode, snap_vhf.status.mode,
"Rig states should be independent"
);
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn multi_rig_default_fallback() {
// When rig_id is omitted, the default rig (rig_hf) should be used.
let state_hf =
sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB);
let state_vhf =
sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM);
let (rigs, default_id, _rx_a, _rx_b) = make_two_rigs(state_hf, state_vhf);
let addr = loopback_addr();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = TcpStream::connect(addr).await.expect("connect");
let (read_half, mut writer) = stream.into_split();
let mut reader = BufReader::new(read_half);
// No rig_id — should resolve to default (rig_hf).
let resp = send_and_recv(
&mut writer,
&mut reader,
br#"{"cmd":"get_state"}"#,
)
.await;
assert!(resp.success, "default get_state should succeed");
assert_eq!(resp.rig_id.as_deref(), Some("rig_hf"));
let snap = resp.state.expect("default snapshot");
assert_eq!(snap.info.model, "HF-Dummy");
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn multi_rig_get_rigs_returns_all() {
let state_hf =
sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB);
let state_vhf =
sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM);
let (rigs, default_id, _rx_a, _rx_b) = make_two_rigs(state_hf, state_vhf);
let addr = loopback_addr();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = TcpStream::connect(addr).await.expect("connect");
let (read_half, mut writer) = stream.into_split();
let mut reader = BufReader::new(read_half);
let resp = send_and_recv(
&mut writer,
&mut reader,
br#"{"cmd":"get_rigs"}"#,
)
.await;
assert!(resp.success, "get_rigs should succeed");
let entries = resp.rigs.expect("rigs list");
assert_eq!(entries.len(), 2, "should return both rigs");
// Collect rig_ids from the entries.
let ids: HashSet<String> = entries.iter().map(|e| e.rig_id.clone()).collect();
assert!(ids.contains("rig_hf"), "should contain rig_hf");
assert!(ids.contains("rig_vhf"), "should contain rig_vhf");
// Verify each entry has the correct frequency.
for entry in &entries {
match entry.rig_id.as_str() {
"rig_hf" => {
assert_eq!(entry.state.status.freq.hz, 14_200_000);
assert_eq!(entry.state.info.model, "HF-Dummy");
assert_eq!(entry.audio_port, Some(4531));
}
"rig_vhf" => {
assert_eq!(entry.state.status.freq.hz, 145_500_000);
assert_eq!(entry.state.info.model, "VHF-Dummy");
assert_eq!(entry.audio_port, Some(4532));
}
other => panic!("Unexpected rig_id: {}", other),
}
}
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn multi_rig_command_routing() {
// Verify that a set_freq command targeting rig_vhf is delivered to the
// VHF rig's mpsc channel and not to the HF rig's channel.
let state_hf =
sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB);
let state_vhf =
sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM);
let (rigs, default_id, mut rx_hf, mut rx_vhf) =
make_two_rigs(state_hf, state_vhf);
let addr = loopback_addr();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = TcpStream::connect(addr).await.expect("connect");
let (_read_half, mut writer) = stream.into_split();
// Send set_freq targeting rig_vhf. The listener will forward the
// command to the VHF rig's mpsc channel.
writer
.write_all(br#"{"rig_id":"rig_vhf","cmd":"set_freq","freq_hz":146000000}"#)
.await
.expect("write");
writer.write_all(b"\n").await.expect("newline");
writer.flush().await.expect("flush");
// The VHF channel should receive the command.
let req = tokio::time::timeout(
std::time::Duration::from_secs(2),
rx_vhf.recv(),
)
.await
.expect("timeout waiting for VHF command")
.expect("VHF channel closed");
assert!(
matches!(req.cmd, trx_core::rig::command::RigCommand::SetFreq(f) if f.hz == 146_000_000),
"VHF rig should receive SetFreq(146 MHz), got {:?}",
req.cmd
);
// The HF channel should NOT have received anything.
assert!(
rx_hf.try_recv().is_err(),
"HF rig should not receive commands targeting VHF"
);
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn multi_rig_command_routing_to_default() {
// When rig_id is omitted, commands should go to the default rig (HF).
let state_hf =
sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB);
let state_vhf =
sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM);
let (rigs, default_id, mut rx_hf, mut rx_vhf) =
make_two_rigs(state_hf, state_vhf);
let addr = loopback_addr();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = TcpStream::connect(addr).await.expect("connect");
let (_read_half, mut writer) = stream.into_split();
// No rig_id — should route to default (rig_hf).
writer
.write_all(br#"{"cmd":"set_freq","freq_hz":7100000}"#)
.await
.expect("write");
writer.write_all(b"\n").await.expect("newline");
writer.flush().await.expect("flush");
// The HF channel should receive the command.
let req = tokio::time::timeout(
std::time::Duration::from_secs(2),
rx_hf.recv(),
)
.await
.expect("timeout waiting for HF command")
.expect("HF channel closed");
assert!(
matches!(req.cmd, trx_core::rig::command::RigCommand::SetFreq(f) if f.hz == 7_100_000),
"HF rig should receive SetFreq(7.1 MHz), got {:?}",
req.cmd
);
// VHF should not receive anything.
assert!(
rx_vhf.try_recv().is_err(),
"VHF rig should not receive commands with no rig_id"
);
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
}