[feat](trx-rs): add ft8 decoder
Co-authored-by: Codex <codex@openai.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
@@ -23,3 +23,4 @@ cpal = "0.15"
|
||||
opus = "0.3"
|
||||
trx-backend = { path = "trx-backend" }
|
||||
trx-core = { path = "../trx-core" }
|
||||
trx-ft8 = { path = "../trx-ft8" }
|
||||
|
||||
+198
-2
@@ -16,15 +16,19 @@ use tracing::{error, info, warn};
|
||||
|
||||
use trx_core::audio::{
|
||||
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE,
|
||||
AUDIO_MSG_CW_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME,
|
||||
AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO,
|
||||
AUDIO_MSG_TX_FRAME,
|
||||
};
|
||||
use trx_core::decode::{AprsPacket, DecodedMessage};
|
||||
use trx_core::decode::{AprsPacket, DecodedMessage, Ft8Message};
|
||||
use trx_core::rig::state::{RigMode, RigState};
|
||||
use trx_ft8::Ft8Decoder;
|
||||
|
||||
use crate::config::AudioConfig;
|
||||
use crate::decode;
|
||||
|
||||
const APRS_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const FT8_HISTORY_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
const FT8_SAMPLE_RATE: u32 = 12_000;
|
||||
|
||||
fn aprs_history() -> &'static Mutex<VecDeque<(Instant, AprsPacket)>> {
|
||||
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, AprsPacket)>>> = OnceLock::new();
|
||||
@@ -59,6 +63,39 @@ pub fn clear_aprs_history() {
|
||||
history.clear();
|
||||
}
|
||||
|
||||
fn ft8_history() -> &'static Mutex<VecDeque<(Instant, Ft8Message)>> {
|
||||
static HISTORY: OnceLock<Mutex<VecDeque<(Instant, Ft8Message)>>> = OnceLock::new();
|
||||
HISTORY.get_or_init(|| Mutex::new(VecDeque::new()))
|
||||
}
|
||||
|
||||
fn prune_ft8_history(history: &mut VecDeque<(Instant, Ft8Message)>) {
|
||||
let cutoff = Instant::now() - FT8_HISTORY_RETENTION;
|
||||
while let Some((ts, _)) = history.front() {
|
||||
if *ts < cutoff {
|
||||
history.pop_front();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_ft8_message(msg: Ft8Message) {
|
||||
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
|
||||
history.push_back((Instant::now(), msg));
|
||||
prune_ft8_history(&mut history);
|
||||
}
|
||||
|
||||
pub fn snapshot_ft8_history() -> Vec<Ft8Message> {
|
||||
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
|
||||
prune_ft8_history(&mut history);
|
||||
history.iter().map(|(_, msg)| msg.clone()).collect()
|
||||
}
|
||||
|
||||
pub fn clear_ft8_history() {
|
||||
let mut history = ft8_history().lock().expect("ft8 history mutex poisoned");
|
||||
history.clear();
|
||||
}
|
||||
|
||||
/// Spawn the audio capture thread.
|
||||
///
|
||||
/// Opens the configured input device via cpal, accumulates PCM samples into
|
||||
@@ -539,6 +576,155 @@ pub async fn run_cw_decoder(
|
||||
}
|
||||
}
|
||||
|
||||
fn downmix_mono(frame: Vec<f32>, channels: u16) -> Vec<f32> {
|
||||
if channels <= 1 {
|
||||
return frame;
|
||||
}
|
||||
let num_frames = frame.len() / channels as usize;
|
||||
let mut mono = Vec::with_capacity(num_frames);
|
||||
for i in 0..num_frames {
|
||||
mono.push(frame[i * channels as usize]);
|
||||
}
|
||||
mono
|
||||
}
|
||||
|
||||
fn resample_to_12k(samples: &[f32], sample_rate: u32) -> Option<Vec<f32>> {
|
||||
if sample_rate == FT8_SAMPLE_RATE {
|
||||
return Some(samples.to_vec());
|
||||
}
|
||||
if sample_rate % FT8_SAMPLE_RATE != 0 {
|
||||
return None;
|
||||
}
|
||||
let factor = (sample_rate / FT8_SAMPLE_RATE) as usize;
|
||||
if factor == 0 {
|
||||
return None;
|
||||
}
|
||||
let mut out = Vec::with_capacity(samples.len() / factor);
|
||||
for chunk in samples.chunks_exact(factor) {
|
||||
let mut acc = 0.0f32;
|
||||
for &s in chunk {
|
||||
acc += s;
|
||||
}
|
||||
out.push(acc / factor as f32);
|
||||
}
|
||||
Some(out)
|
||||
}
|
||||
|
||||
/// Run the FT8 decoder task. Only processes PCM when rig mode is DIG/USB and enabled.
|
||||
pub async fn run_ft8_decoder(
|
||||
sample_rate: u32,
|
||||
channels: u16,
|
||||
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
|
||||
mut state_rx: watch::Receiver<RigState>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
) {
|
||||
info!("FT8 decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||
let mut decoder = match Ft8Decoder::new(FT8_SAMPLE_RATE) {
|
||||
Ok(decoder) => decoder,
|
||||
Err(err) => {
|
||||
warn!("FT8 decoder init failed: {}", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut last_reset_seq: u64 = 0;
|
||||
let mut active = state_rx.borrow().ft8_decode_enabled
|
||||
&& matches!(state_rx.borrow().status.mode, RigMode::DIG | RigMode::USB);
|
||||
let mut ft8_buf: Vec<f32> = Vec::new();
|
||||
|
||||
loop {
|
||||
if !active {
|
||||
match state_rx.changed().await {
|
||||
Ok(()) => {
|
||||
let state = state_rx.borrow();
|
||||
active = state.ft8_decode_enabled
|
||||
&& matches!(state.status.mode, RigMode::DIG | RigMode::USB);
|
||||
if active {
|
||||
pcm_rx = pcm_rx.resubscribe();
|
||||
}
|
||||
if state.ft8_decode_reset_seq != last_reset_seq {
|
||||
last_reset_seq = state.ft8_decode_reset_seq;
|
||||
decoder.reset();
|
||||
ft8_buf.clear();
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
recv = pcm_rx.recv() => {
|
||||
match recv {
|
||||
Ok(frame) => {
|
||||
let state = state_rx.borrow();
|
||||
if state.ft8_decode_reset_seq != last_reset_seq {
|
||||
last_reset_seq = state.ft8_decode_reset_seq;
|
||||
decoder.reset();
|
||||
ft8_buf.clear();
|
||||
}
|
||||
|
||||
let mono = downmix_mono(frame, channels);
|
||||
let Some(resampled) = resample_to_12k(&mono, sample_rate) else {
|
||||
warn!("FT8 decoder: unsupported sample rate {}", sample_rate);
|
||||
break;
|
||||
};
|
||||
ft8_buf.extend_from_slice(&resampled);
|
||||
|
||||
while ft8_buf.len() >= decoder.block_size() {
|
||||
let block: Vec<f32> = ft8_buf.drain(..decoder.block_size()).collect();
|
||||
decoder.process_block(&block);
|
||||
let results = decoder.decode_if_ready(100);
|
||||
if !results.is_empty() {
|
||||
decoder.reset();
|
||||
for res in results {
|
||||
let ts_ms = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
|
||||
Ok(dur) => dur.as_millis() as i64,
|
||||
Err(_) => 0,
|
||||
};
|
||||
let msg = Ft8Message {
|
||||
ts_ms,
|
||||
snr_db: res.snr_db,
|
||||
dt_s: res.dt_s,
|
||||
freq_hz: res.freq_hz,
|
||||
message: res.text,
|
||||
};
|
||||
record_ft8_message(msg.clone());
|
||||
let _ = decode_tx.send(DecodedMessage::Ft8(msg));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
warn!("FT8 decoder: dropped {} PCM frames", n);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
changed = state_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) => {
|
||||
let state = state_rx.borrow();
|
||||
active = state.ft8_decode_enabled
|
||||
&& matches!(state.status.mode, RigMode::DIG | RigMode::USB);
|
||||
if state.ft8_decode_reset_seq != last_reset_seq {
|
||||
last_reset_seq = state.ft8_decode_reset_seq;
|
||||
decoder.reset();
|
||||
ft8_buf.clear();
|
||||
}
|
||||
if !active {
|
||||
decoder.reset();
|
||||
ft8_buf.clear();
|
||||
} else {
|
||||
pcm_rx = pcm_rx.resubscribe();
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the audio TCP listener, accepting client connections.
|
||||
pub async fn run_audio_listener(
|
||||
addr: SocketAddr,
|
||||
@@ -594,6 +780,15 @@ async fn handle_audio_client(
|
||||
write_audio_msg(&mut writer, msg_type, &json).await?;
|
||||
}
|
||||
}
|
||||
// Send FT8 history to newly connected client.
|
||||
let history = snapshot_ft8_history();
|
||||
for msg in history {
|
||||
let msg = DecodedMessage::Ft8(msg);
|
||||
let msg_type = AUDIO_MSG_FT8_DECODE;
|
||||
if let Ok(json) = serde_json::to_vec(&msg) {
|
||||
write_audio_msg(&mut writer, msg_type, &json).await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn RX + decode forwarding task (shares the writer)
|
||||
let mut rx_sub = rx_audio.subscribe();
|
||||
@@ -622,6 +817,7 @@ async fn handle_audio_client(
|
||||
let msg_type = match &msg {
|
||||
DecodedMessage::Aprs(_) => AUDIO_MSG_APRS_DECODE,
|
||||
DecodedMessage::Cw(_) => AUDIO_MSG_CW_DECODE,
|
||||
DecodedMessage::Ft8(_) => AUDIO_MSG_FT8_DECODE,
|
||||
};
|
||||
if let Ok(json) = serde_json::to_vec(&msg) {
|
||||
if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await {
|
||||
|
||||
@@ -193,8 +193,10 @@ fn map_command(cmd: ClientCommand) -> RigCommand {
|
||||
ClientCommand::SetCwAuto { enabled } => RigCommand::SetCwAuto(enabled),
|
||||
ClientCommand::SetCwWpm { wpm } => RigCommand::SetCwWpm(wpm),
|
||||
ClientCommand::SetCwToneHz { tone_hz } => RigCommand::SetCwToneHz(tone_hz),
|
||||
ClientCommand::SetFt8DecodeEnabled { enabled } => RigCommand::SetFt8DecodeEnabled(enabled),
|
||||
ClientCommand::ResetAprsDecoder => RigCommand::ResetAprsDecoder,
|
||||
ClientCommand::ResetCwDecoder => RigCommand::ResetCwDecoder,
|
||||
ClientCommand::ResetFt8Decoder => RigCommand::ResetFt8Decoder,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -227,8 +227,10 @@ fn build_initial_state(cfg: &ServerConfig, resolved: &ResolvedConfig) -> RigStat
|
||||
cw_auto: true,
|
||||
cw_wpm: 15,
|
||||
cw_tone_hz: 700,
|
||||
ft8_decode_enabled: false,
|
||||
aprs_decode_reset_seq: 0,
|
||||
cw_decode_reset_seq: 0,
|
||||
ft8_decode_reset_seq: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -371,6 +373,16 @@ async fn main() -> DynResult<()> {
|
||||
tokio::spawn(audio::run_cw_decoder(
|
||||
cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx,
|
||||
));
|
||||
|
||||
// Spawn FT8 decoder task
|
||||
let ft8_pcm_rx = pcm_tx.subscribe();
|
||||
let ft8_state_rx = _state_rx.clone();
|
||||
let ft8_decode_tx = decode_tx.clone();
|
||||
let ft8_sr = cfg.audio.sample_rate;
|
||||
let ft8_ch = cfg.audio.channels;
|
||||
tokio::spawn(audio::run_ft8_decoder(
|
||||
ft8_sr, ft8_ch as u16, ft8_pcm_rx, ft8_state_rx, ft8_decode_tx,
|
||||
));
|
||||
}
|
||||
if cfg.audio.tx_enabled {
|
||||
let _playback_thread = audio::spawn_audio_playback(&cfg.audio, tx_audio_rx);
|
||||
|
||||
@@ -128,8 +128,10 @@ pub async fn run_rig_task(
|
||||
cw_auto: true,
|
||||
cw_wpm: 15,
|
||||
cw_tone_hz: 700,
|
||||
ft8_decode_enabled: false,
|
||||
aprs_decode_reset_seq: 0,
|
||||
cw_decode_reset_seq: 0,
|
||||
ft8_decode_reset_seq: 0,
|
||||
};
|
||||
|
||||
// Polling configuration
|
||||
@@ -378,6 +380,11 @@ async fn process_command(
|
||||
let _ = ctx.state_tx.send(ctx.state.clone());
|
||||
return snapshot_from(ctx.state);
|
||||
}
|
||||
RigCommand::SetFt8DecodeEnabled(en) => {
|
||||
ctx.state.ft8_decode_enabled = en;
|
||||
let _ = ctx.state_tx.send(ctx.state.clone());
|
||||
return snapshot_from(ctx.state);
|
||||
}
|
||||
RigCommand::ResetAprsDecoder => {
|
||||
audio::clear_aprs_history();
|
||||
ctx.state.aprs_decode_reset_seq += 1;
|
||||
@@ -389,6 +396,12 @@ async fn process_command(
|
||||
let _ = ctx.state_tx.send(ctx.state.clone());
|
||||
return snapshot_from(ctx.state);
|
||||
}
|
||||
RigCommand::ResetFt8Decoder => {
|
||||
audio::clear_ft8_history();
|
||||
ctx.state.ft8_decode_reset_seq += 1;
|
||||
let _ = ctx.state_tx.send(ctx.state.clone());
|
||||
return snapshot_from(ctx.state);
|
||||
}
|
||||
_ => {} // fall through to normal rig handler
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user