[feat](trx-rs): add FT2 decoder support (wired to FT4)

Mirrors the FT4 implementation across the full stack. The trx-ft8
crate wires Ft8Decoder::new_ft2() to FTX_PROTOCOL_FT4 as a
placeholder pending a dedicated FT2 implementation.

Changes:
- trx-ft8: Ft8Decoder::new_ft2() delegates to with_protocol(Ft4)
- trx-core: DecodedMessage::Ft2, AUDIO_MSG_FT2_DECODE (0x15),
  ft2_decode_enabled/ft2_decode_reset_seq state, SetFt2DecodeEnabled/
  ResetFt2Decoder commands, protocol mapping
- trx-server: DecoderHistories::ft2, run_ft2_decoder (7.5s slots),
  run_background_ft2_decoder, history push/replay, decoder task spawn
- trx-frontend-http: ft2_history in FrontendRuntimeContext,
  toggle/clear endpoints, /ft2.js route, bookmark/scheduler/background
  decode support, DecodeHistoryPayload ft2 field
- web: ft2.js plugin (3.75s period timer), FT2 subtab in index.html,
  FT2 map source (distinct hue), app.js dispatch, decode-history-worker
  HISTORY_GROUP_KEYS, bookmarks read/write/apply

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-03-14 19:34:41 +01:00
parent 708c00a84c
commit d547c45a9c
25 changed files with 683 additions and 26 deletions
+269 -2
View File
@@ -24,7 +24,8 @@ use trx_aprs::AprsDecoder;
use trx_core::audio::{
parse_vchan_uuid_msg, read_audio_msg, write_audio_msg, write_vchan_audio_frame,
write_vchan_uuid_msg, AudioStreamInfo,
AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT4_DECODE,
AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT2_DECODE,
AUDIO_MSG_FT4_DECODE,
AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED,
AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED,
AUDIO_MSG_VCHAN_BW, AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE,
@@ -158,6 +159,7 @@ pub struct DecoderHistories {
pub cw: Mutex<VecDeque<(Instant, CwEvent)>>,
pub ft8: Mutex<VecDeque<(Instant, Ft8Message)>>,
pub ft4: Mutex<VecDeque<(Instant, Ft8Message)>>,
pub ft2: Mutex<VecDeque<(Instant, Ft8Message)>>,
pub wspr: Mutex<VecDeque<(Instant, WsprMessage)>>,
}
@@ -171,6 +173,7 @@ impl DecoderHistories {
cw: Mutex::new(VecDeque::new()),
ft8: Mutex::new(VecDeque::new()),
ft4: Mutex::new(VecDeque::new()),
ft2: Mutex::new(VecDeque::new()),
wspr: Mutex::new(VecDeque::new()),
})
}
@@ -394,6 +397,35 @@ impl DecoderHistories {
self.ft4.lock().expect("ft4 history mutex poisoned").clear();
}
// --- FT2 ---
fn prune_ft2(history: &mut VecDeque<(Instant, Ft8Message)>) {
let cutoff = Instant::now() - FT8_HISTORY_RETENTION;
while let Some((ts, _)) = history.front() {
if *ts < cutoff {
history.pop_front();
} else {
break;
}
}
}
pub fn record_ft2_message(&self, msg: Ft8Message) {
let mut h = self.ft2.lock().expect("ft2 history mutex poisoned");
h.push_back((Instant::now(), msg));
Self::prune_ft2(&mut h);
}
pub fn snapshot_ft2_history(&self) -> Vec<Ft8Message> {
let mut h = self.ft2.lock().expect("ft2 history mutex poisoned");
Self::prune_ft2(&mut h);
h.iter().map(|(_, msg)| msg.clone()).collect()
}
pub fn clear_ft2_history(&self) {
self.ft2.lock().expect("ft2 history mutex poisoned").clear();
}
// --- WSPR ---
fn prune_wspr(history: &mut VecDeque<(Instant, WsprMessage)>) {
@@ -436,8 +468,9 @@ impl DecoderHistories {
let cw = self.cw.lock().map(|h| h.len()).unwrap_or(0);
let ft8 = self.ft8.lock().map(|h| h.len()).unwrap_or(0);
let ft4 = self.ft4.lock().map(|h| h.len()).unwrap_or(0);
let ft2 = self.ft2.lock().map(|h| h.len()).unwrap_or(0);
let wspr = self.wspr.lock().map(|h| h.len()).unwrap_or(0);
ais + vdes + aprs + hf_aprs + cw + ft8 + ft4 + wspr
ais + vdes + aprs + hf_aprs + cw + ft8 + ft4 + ft2 + wspr
}
}
@@ -1820,6 +1853,148 @@ pub async fn run_ft4_decoder(
}
}
/// Run the FT2 decoder task. Mirrors FT4 but uses FT2 protocol (7.5s slots for now).
pub async fn run_ft2_decoder(
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
mut state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
histories: Arc<DecoderHistories>,
) {
info!("FT2 decoder started ({}Hz, {} ch)", sample_rate, channels);
let mut decoder = match Ft8Decoder::new_ft2(FT8_SAMPLE_RATE) {
Ok(decoder) => decoder,
Err(err) => {
warn!("FT2 decoder init failed: {}", err);
return;
}
};
let mut last_reset_seq: u64 = 0;
let mut active = state_rx.borrow().ft2_decode_enabled
&& matches!(state_rx.borrow().status.mode, RigMode::DIG | RigMode::USB);
let mut ft2_buf: Vec<f32> = Vec::new();
let mut last_slot: i64 = -1;
loop {
if !active {
match state_rx.changed().await {
Ok(()) => {
let state = state_rx.borrow();
active = state.ft2_decode_enabled
&& matches!(state.status.mode, RigMode::DIG | RigMode::USB);
if active {
pcm_rx = pcm_rx.resubscribe();
}
if state.ft2_decode_reset_seq != last_reset_seq {
last_reset_seq = state.ft2_decode_reset_seq;
decoder.reset();
ft2_buf.clear();
}
last_slot = -1;
}
Err(_) => break,
}
continue;
}
tokio::select! {
recv = pcm_rx.recv() => {
match recv {
Ok(frame) => {
let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(dur) => dur.as_secs() as i64,
Err(_) => 0,
};
// FT2 slot period is 7.5s (same as FT4 for now); use now * 2 / 15
let slot = now * 2 / 15;
if slot != last_slot {
last_slot = slot;
decoder.reset();
ft2_buf.clear();
}
let reset_seq = {
let state = state_rx.borrow();
state.ft2_decode_reset_seq
};
if reset_seq != last_reset_seq {
last_reset_seq = reset_seq;
decoder.reset();
ft2_buf.clear();
}
let mut mono = downmix_mono(frame, channels);
apply_decode_audio_gate(&mut mono);
let Some(resampled) = resample_to_12k(&mono, sample_rate) else {
warn!("FT2 decoder: unsupported sample rate {}", sample_rate);
break;
};
ft2_buf.extend_from_slice(&resampled);
while ft2_buf.len() >= decoder.block_size() {
let block: Vec<f32> = ft2_buf.drain(..decoder.block_size()).collect();
let results = tokio::task::block_in_place(|| {
decoder.process_block(&block);
decoder.decode_if_ready(100)
});
if !results.is_empty() {
for res in results {
let ts_ms = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(dur) => dur.as_millis() as i64,
Err(_) => 0,
};
let base_freq_hz = state_rx.borrow().status.freq.hz as f64;
let abs_freq_hz = base_freq_hz + res.freq_hz as f64;
let msg = Ft8Message {
ts_ms,
snr_db: res.snr_db,
dt_s: res.dt_s,
freq_hz: if abs_freq_hz.is_finite() && abs_freq_hz > 0.0 {
abs_freq_hz as f32
} else {
res.freq_hz
},
message: res.text,
};
histories.record_ft2_message(msg.clone());
let _ = decode_tx.send(DecodedMessage::Ft2(msg));
}
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("FT2 decoder: dropped {} PCM frames", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
changed = state_rx.changed() => {
match changed {
Ok(()) => {
let state = state_rx.borrow();
active = state.ft2_decode_enabled
&& matches!(state.status.mode, RigMode::DIG | RigMode::USB);
if state.ft2_decode_reset_seq != last_reset_seq {
last_reset_seq = state.ft2_decode_reset_seq;
decoder.reset();
ft2_buf.clear();
}
if !active {
decoder.reset();
ft2_buf.clear();
last_slot = -1;
} else {
pcm_rx = pcm_rx.resubscribe();
}
}
Err(_) => break,
}
}
}
}
}
/// Run the WSPR decoder task. Mirrors FT8 lifecycle/slot behavior.
///
/// Note: decoding engine integration is intentionally staged; this task already
@@ -2250,6 +2425,85 @@ async fn run_background_ft4_decoder(
}
}
async fn run_background_ft2_decoder(
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
base_freq_hz: u64,
decode_tx: broadcast::Sender<DecodedMessage>,
) {
info!(
"Background FT2 decoder started ({}Hz, {} ch @ {} Hz)",
sample_rate, channels, base_freq_hz
);
let mut decoder = match Ft8Decoder::new_ft2(FT8_SAMPLE_RATE) {
Ok(decoder) => decoder,
Err(err) => {
warn!("Background FT2 decoder init failed: {}", err);
return;
}
};
let mut ft2_buf: Vec<f32> = Vec::new();
let mut last_slot: i64 = -1;
loop {
match pcm_rx.recv().await {
Ok(frame) => {
let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
{
Ok(dur) => dur.as_secs() as i64,
Err(_) => 0,
};
// FT2 slot period is 7.5s (same as FT4 for now); use now * 2 / 15
let slot = now * 2 / 15;
if slot != last_slot {
last_slot = slot;
decoder.reset();
ft2_buf.clear();
}
let mut mono = downmix_mono(frame, channels);
apply_decode_audio_gate(&mut mono);
let Some(resampled) = resample_to_12k(&mono, sample_rate) else {
warn!(
"Background FT2 decoder: unsupported sample rate {}",
sample_rate
);
break;
};
ft2_buf.extend_from_slice(&resampled);
while ft2_buf.len() >= decoder.block_size() {
let block: Vec<f32> = ft2_buf.drain(..decoder.block_size()).collect();
let results = tokio::task::block_in_place(|| {
decoder.process_block(&block);
decoder.decode_if_ready(100)
});
for res in results {
let abs_freq_hz = base_freq_hz as f64 + res.freq_hz as f64;
let msg = Ft8Message {
ts_ms: current_timestamp_ms(),
snr_db: res.snr_db,
dt_s: res.dt_s,
freq_hz: if abs_freq_hz.is_finite() && abs_freq_hz > 0.0 {
abs_freq_hz as f32
} else {
res.freq_hz
},
message: res.text,
};
let _ = decode_tx.send(DecodedMessage::Ft2(msg));
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("Background FT2 decoder: dropped {} PCM frames", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
async fn run_background_wspr_decoder(
sample_rate: u32,
channels: u16,
@@ -2452,6 +2706,7 @@ async fn handle_audio_client(
push_history!(histories.snapshot_hf_aprs_history(), DecodedMessage::HfAprs, AUDIO_MSG_HF_APRS_DECODE);
push_history!(histories.snapshot_ft8_history(), DecodedMessage::Ft8, AUDIO_MSG_FT8_DECODE);
push_history!(histories.snapshot_ft4_history(), DecodedMessage::Ft4, AUDIO_MSG_FT4_DECODE);
push_history!(histories.snapshot_ft2_history(), DecodedMessage::Ft2, AUDIO_MSG_FT2_DECODE);
push_history!(histories.snapshot_wspr_history(), DecodedMessage::Wspr, AUDIO_MSG_WSPR_DECODE);
push_history!(histories.snapshot_cw_history(), DecodedMessage::Cw, AUDIO_MSG_CW_DECODE);
@@ -2538,6 +2793,7 @@ async fn handle_audio_client(
DecodedMessage::Cw(_) => AUDIO_MSG_CW_DECODE,
DecodedMessage::Ft8(_) => AUDIO_MSG_FT8_DECODE,
DecodedMessage::Ft4(_) => AUDIO_MSG_FT4_DECODE,
DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE,
DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE,
};
if let Ok(json) = serde_json::to_vec(&msg) {
@@ -2564,6 +2820,7 @@ async fn handle_audio_client(
DecodedMessage::Cw(_) => AUDIO_MSG_CW_DECODE,
DecodedMessage::Ft8(_) => AUDIO_MSG_FT8_DECODE,
DecodedMessage::Ft4(_) => AUDIO_MSG_FT4_DECODE,
DecodedMessage::Ft2(_) => AUDIO_MSG_FT2_DECODE,
DecodedMessage::Wspr(_) => AUDIO_MSG_WSPR_DECODE,
};
if let Ok(json) = serde_json::to_vec(&msg) {
@@ -2704,6 +2961,16 @@ async fn handle_audio_client(
)
.await;
}),
"ft2" => tokio::spawn(async move {
run_background_ft2_decoder(
sr,
ch_count,
task_rx,
base_freq_hz,
decode_tx,
)
.await;
}),
"wspr" => tokio::spawn(async move {
run_background_wspr_decoder(
sr,
+15
View File
@@ -741,6 +741,21 @@ fn spawn_rig_audio_stack(
}
}));
// Spawn FT2 decoder task
let ft2_pcm_rx = pcm_tx.subscribe();
let ft2_state_rx = state_rx.clone();
let ft2_decode_tx = decode_tx.clone();
let ft2_sr = rig_cfg.audio.sample_rate;
let ft2_ch = rig_cfg.audio.channels;
let ft2_shutdown_rx = shutdown_rx.clone();
let ft2_histories = histories.clone();
handles.push(tokio::spawn(async move {
tokio::select! {
_ = audio::run_ft2_decoder(ft2_sr, ft2_ch as u16, ft2_pcm_rx, ft2_state_rx, ft2_decode_tx, ft2_histories) => {}
_ = wait_for_shutdown(ft2_shutdown_rx) => {}
}
}));
// Spawn WSPR decoder task
let wspr_pcm_rx = pcm_tx.subscribe();
let wspr_state_rx = state_rx.clone();
+12
View File
@@ -476,6 +476,12 @@ async fn process_command(
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::SetFt2DecodeEnabled(en) => {
ctx.state.ft2_decode_enabled = en;
info!("FT2 decode {}", if en { "enabled" } else { "disabled" });
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::SetWsprDecodeEnabled(en) => {
ctx.state.wspr_decode_enabled = en;
info!("WSPR decode {}", if en { "enabled" } else { "disabled" });
@@ -517,6 +523,12 @@ async fn process_command(
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::ResetFt2Decoder => {
ctx.histories.clear_ft2_history();
ctx.state.ft2_decode_reset_seq += 1;
let _ = ctx.state_tx.send(ctx.state.clone());
return snapshot_from(ctx.state);
}
RigCommand::ResetWsprDecoder => {
ctx.histories.clear_wspr_history();
ctx.state.wspr_decode_reset_seq += 1;