[fix](trx-rs): improve FT2 decoder acquisition
Use FT2-specific analysis settings and a wider receive span. Switch server-side FT2 decoding to a rolling async window. Widen FT2 candidate timing search in the vendored decoder. Co-authored-by: OpenAI Codex <codex@openai.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
+86
-36
@@ -4,7 +4,7 @@
|
||||
|
||||
//! Audio capture, playback, and TCP streaming for trx-server.
|
||||
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -54,6 +54,9 @@ const CW_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const WSPR_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const FT8_SAMPLE_RATE: u32 = 12_000;
|
||||
const FT2_ASYNC_BUFFER_SAMPLES: usize = 45_000;
|
||||
const FT2_ASYNC_TRIGGER_SAMPLES: usize = 9_000;
|
||||
const FT2_DEDUPE_RETENTION: Duration = Duration::from_secs(8);
|
||||
const DECODE_AUDIO_GATE_RMS: f32 = 2.5e-4;
|
||||
const AUDIO_STREAM_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60);
|
||||
const AUDIO_STREAM_RECOVERY_DELAY: Duration = Duration::from_secs(1);
|
||||
@@ -65,6 +68,48 @@ fn current_timestamp_ms() -> i64 {
|
||||
}
|
||||
}
|
||||
|
||||
fn retain_ft2_window(buf: &mut Vec<f32>) {
|
||||
if buf.len() > FT2_ASYNC_BUFFER_SAMPLES {
|
||||
let excess = buf.len() - FT2_ASYNC_BUFFER_SAMPLES;
|
||||
buf.drain(..excess);
|
||||
}
|
||||
}
|
||||
|
||||
fn prune_recent_ft2_decodes(recent: &mut HashMap<String, Instant>, now: Instant) {
|
||||
recent.retain(|_, seen_at| now.duration_since(*seen_at) <= FT2_DEDUPE_RETENTION);
|
||||
}
|
||||
|
||||
fn should_emit_ft2_decode(recent: &mut HashMap<String, Instant>, text: &str, freq_hz: f32) -> bool {
|
||||
let now = Instant::now();
|
||||
prune_recent_ft2_decodes(recent, now);
|
||||
let key = format!("{}|{}", text, freq_hz.round() as i32);
|
||||
if recent.contains_key(&key) {
|
||||
return false;
|
||||
}
|
||||
recent.insert(key, now);
|
||||
true
|
||||
}
|
||||
|
||||
fn decode_ft2_window(
|
||||
decoder: &mut Ft8Decoder,
|
||||
samples: &[f32],
|
||||
max_results: usize,
|
||||
) -> Vec<trx_ft8::Ft8DecodeResult> {
|
||||
let window_samples = decoder.window_samples();
|
||||
if samples.len() < window_samples {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
decoder.reset();
|
||||
let block_size = decoder.block_size();
|
||||
let start = samples.len() - window_samples;
|
||||
let window = &samples[start..];
|
||||
for block in window.chunks_exact(block_size) {
|
||||
decoder.process_block(block);
|
||||
}
|
||||
decoder.decode_if_ready(max_results)
|
||||
}
|
||||
|
||||
struct StreamErrorLogger {
|
||||
label: &'static str,
|
||||
state: Mutex<StreamErrorState>,
|
||||
@@ -1874,7 +1919,8 @@ pub async fn run_ft2_decoder(
|
||||
let mut active = state_rx.borrow().ft2_decode_enabled
|
||||
&& matches!(state_rx.borrow().status.mode, RigMode::DIG | RigMode::USB);
|
||||
let mut ft2_buf: Vec<f32> = Vec::new();
|
||||
let mut last_slot: i64 = -1;
|
||||
let mut pending_decode_samples: usize = 0;
|
||||
let mut recent_decodes: HashMap<String, Instant> = HashMap::new();
|
||||
|
||||
loop {
|
||||
if !active {
|
||||
@@ -1890,8 +1936,9 @@ pub async fn run_ft2_decoder(
|
||||
last_reset_seq = state.ft2_decode_reset_seq;
|
||||
decoder.reset();
|
||||
ft2_buf.clear();
|
||||
pending_decode_samples = 0;
|
||||
recent_decodes.clear();
|
||||
}
|
||||
last_slot = -1;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
@@ -1902,17 +1949,6 @@ pub async fn run_ft2_decoder(
|
||||
recv = pcm_rx.recv() => {
|
||||
match recv {
|
||||
Ok(frame) => {
|
||||
let now_ms = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
|
||||
Ok(dur) => dur.as_millis() as i64,
|
||||
Err(_) => 0,
|
||||
};
|
||||
let slot = now_ms / 3_750;
|
||||
if slot != last_slot {
|
||||
last_slot = slot;
|
||||
decoder.reset();
|
||||
ft2_buf.clear();
|
||||
}
|
||||
|
||||
let reset_seq = {
|
||||
let state = state_rx.borrow();
|
||||
state.ft2_decode_reset_seq
|
||||
@@ -1921,6 +1957,8 @@ pub async fn run_ft2_decoder(
|
||||
last_reset_seq = reset_seq;
|
||||
decoder.reset();
|
||||
ft2_buf.clear();
|
||||
pending_decode_samples = 0;
|
||||
recent_decodes.clear();
|
||||
}
|
||||
|
||||
let mut mono = downmix_mono(frame, channels);
|
||||
@@ -1930,12 +1968,15 @@ pub async fn run_ft2_decoder(
|
||||
break;
|
||||
};
|
||||
ft2_buf.extend_from_slice(&resampled);
|
||||
pending_decode_samples += resampled.len();
|
||||
retain_ft2_window(&mut ft2_buf);
|
||||
|
||||
while ft2_buf.len() >= decoder.block_size() {
|
||||
let block: Vec<f32> = ft2_buf.drain(..decoder.block_size()).collect();
|
||||
while pending_decode_samples >= FT2_ASYNC_TRIGGER_SAMPLES
|
||||
&& ft2_buf.len() >= FT2_ASYNC_BUFFER_SAMPLES
|
||||
{
|
||||
pending_decode_samples -= FT2_ASYNC_TRIGGER_SAMPLES;
|
||||
let results = tokio::task::block_in_place(|| {
|
||||
decoder.process_block(&block);
|
||||
decoder.decode_if_ready(100)
|
||||
decode_ft2_window(&mut decoder, &ft2_buf, 100)
|
||||
});
|
||||
if !results.is_empty() {
|
||||
for res in results {
|
||||
@@ -1956,6 +1997,13 @@ pub async fn run_ft2_decoder(
|
||||
},
|
||||
message: res.text,
|
||||
};
|
||||
if !should_emit_ft2_decode(
|
||||
&mut recent_decodes,
|
||||
&msg.message,
|
||||
msg.freq_hz,
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
histories.record_ft2_message(msg.clone());
|
||||
let _ = decode_tx.send(DecodedMessage::Ft2(msg));
|
||||
}
|
||||
@@ -1978,11 +2026,14 @@ pub async fn run_ft2_decoder(
|
||||
last_reset_seq = state.ft2_decode_reset_seq;
|
||||
decoder.reset();
|
||||
ft2_buf.clear();
|
||||
pending_decode_samples = 0;
|
||||
recent_decodes.clear();
|
||||
}
|
||||
if !active {
|
||||
decoder.reset();
|
||||
ft2_buf.clear();
|
||||
last_slot = -1;
|
||||
pending_decode_samples = 0;
|
||||
recent_decodes.clear();
|
||||
} else {
|
||||
pcm_rx = pcm_rx.resubscribe();
|
||||
}
|
||||
@@ -2443,23 +2494,12 @@ async fn run_background_ft2_decoder(
|
||||
}
|
||||
};
|
||||
let mut ft2_buf: Vec<f32> = Vec::new();
|
||||
let mut last_slot: i64 = -1;
|
||||
let mut pending_decode_samples: usize = 0;
|
||||
let mut recent_decodes: HashMap<String, Instant> = HashMap::new();
|
||||
|
||||
loop {
|
||||
match pcm_rx.recv().await {
|
||||
Ok(frame) => {
|
||||
let now_ms = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
|
||||
{
|
||||
Ok(dur) => dur.as_millis() as i64,
|
||||
Err(_) => 0,
|
||||
};
|
||||
let slot = now_ms / 3_750;
|
||||
if slot != last_slot {
|
||||
last_slot = slot;
|
||||
decoder.reset();
|
||||
ft2_buf.clear();
|
||||
}
|
||||
|
||||
let mut mono = downmix_mono(frame, channels);
|
||||
apply_decode_audio_gate(&mut mono);
|
||||
let Some(resampled) = resample_to_12k(&mono, sample_rate) else {
|
||||
@@ -2470,12 +2510,15 @@ async fn run_background_ft2_decoder(
|
||||
break;
|
||||
};
|
||||
ft2_buf.extend_from_slice(&resampled);
|
||||
pending_decode_samples += resampled.len();
|
||||
retain_ft2_window(&mut ft2_buf);
|
||||
|
||||
while ft2_buf.len() >= decoder.block_size() {
|
||||
let block: Vec<f32> = ft2_buf.drain(..decoder.block_size()).collect();
|
||||
while pending_decode_samples >= FT2_ASYNC_TRIGGER_SAMPLES
|
||||
&& ft2_buf.len() >= FT2_ASYNC_BUFFER_SAMPLES
|
||||
{
|
||||
pending_decode_samples -= FT2_ASYNC_TRIGGER_SAMPLES;
|
||||
let results = tokio::task::block_in_place(|| {
|
||||
decoder.process_block(&block);
|
||||
decoder.decode_if_ready(100)
|
||||
decode_ft2_window(&mut decoder, &ft2_buf, 100)
|
||||
});
|
||||
for res in results {
|
||||
let abs_freq_hz = base_freq_hz as f64 + res.freq_hz as f64;
|
||||
@@ -2490,6 +2533,13 @@ async fn run_background_ft2_decoder(
|
||||
},
|
||||
message: res.text,
|
||||
};
|
||||
if !should_emit_ft2_decode(
|
||||
&mut recent_decodes,
|
||||
&msg.message,
|
||||
msg.freq_hz,
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
let _ = decode_tx.send(DecodedMessage::Ft2(msg));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user