[feat](trx-server): add server-side APRS and CW decoders

Port Bell 202 AFSK demodulator (correlation detector, PLL clock
recovery, NRZI+HDLC, AX.25/APRS parser) and Goertzel CW decoder
(auto tone scan, auto WPM via k-means, Morse lookup) from browser JS
to Rust.

Add PCM tap to audio capture thread, spawn APRS/CW decoder tasks
gated by rig mode (PKT for APRS, CW/CWR for CW).  Forward decoded
messages over the audio TCP wire using new message types 0x03/0x04.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
2026-02-08 22:28:43 +01:00
parent 042aab7199
commit f7f0e46021
5 changed files with 1183 additions and 16 deletions
+152 -14
View File
@@ -8,15 +8,18 @@ use std::net::SocketAddr;
use bytes::Bytes; use bytes::Bytes;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc, watch};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use trx_core::audio::{ use trx_core::audio::{
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE,
AUDIO_MSG_TX_FRAME, AUDIO_MSG_CW_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME,
}; };
use trx_core::decode::DecodedMessage;
use trx_core::rig::state::{RigMode, RigState};
use crate::config::AudioConfig; use crate::config::AudioConfig;
use crate::decode;
/// Spawn the audio capture thread. /// Spawn the audio capture thread.
/// ///
@@ -26,6 +29,7 @@ use crate::config::AudioConfig;
pub fn spawn_audio_capture( pub fn spawn_audio_capture(
cfg: &AudioConfig, cfg: &AudioConfig,
tx: broadcast::Sender<Bytes>, tx: broadcast::Sender<Bytes>,
pcm_tx: Option<broadcast::Sender<Vec<f32>>>,
) -> std::thread::JoinHandle<()> { ) -> std::thread::JoinHandle<()> {
let sample_rate = cfg.sample_rate; let sample_rate = cfg.sample_rate;
let channels = cfg.channels as u16; let channels = cfg.channels as u16;
@@ -35,7 +39,7 @@ pub fn spawn_audio_capture(
std::thread::spawn(move || { std::thread::spawn(move || {
if let Err(e) = if let Err(e) =
run_capture(sample_rate, channels, frame_duration_ms, bitrate_bps, device_name, tx) run_capture(sample_rate, channels, frame_duration_ms, bitrate_bps, device_name, tx, pcm_tx)
{ {
error!("Audio capture thread error: {}", e); error!("Audio capture thread error: {}", e);
} }
@@ -49,6 +53,7 @@ fn run_capture(
bitrate_bps: u32, bitrate_bps: u32,
device_name: Option<String>, device_name: Option<String>,
tx: broadcast::Sender<Bytes>, tx: broadcast::Sender<Bytes>,
pcm_tx: Option<broadcast::Sender<Vec<f32>>>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
@@ -130,6 +135,9 @@ fn run_capture(
pcm_buf.extend_from_slice(&samples); pcm_buf.extend_from_slice(&samples);
while pcm_buf.len() >= frame_samples { while pcm_buf.len() >= frame_samples {
let frame: Vec<f32> = pcm_buf.drain(..frame_samples).collect(); let frame: Vec<f32> = pcm_buf.drain(..frame_samples).collect();
if let Some(ref pcm_tx) = pcm_tx {
let _ = pcm_tx.send(frame.clone());
}
match encoder.encode_float(&frame, &mut opus_buf) { match encoder.encode_float(&frame, &mut opus_buf) {
Ok(len) => { Ok(len) => {
let packet = Bytes::copy_from_slice(&opus_buf[..len]); let packet = Bytes::copy_from_slice(&opus_buf[..len]);
@@ -263,12 +271,115 @@ fn run_playback(
Ok(()) Ok(())
} }
/// Run the APRS decoder task. Only processes PCM when rig mode is PKT.
pub async fn run_aprs_decoder(
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
) {
info!("APRS decoder started ({}Hz, {} ch)", sample_rate, channels);
let mut decoder = decode::aprs::AprsDecoder::new(sample_rate);
let mut was_active = false;
loop {
match pcm_rx.recv().await {
Ok(frame) => {
let mode = &state_rx.borrow().status.mode;
let active = matches!(mode, RigMode::PKT);
if !active {
if was_active {
decoder.reset();
was_active = false;
}
continue;
}
was_active = true;
// Downmix to mono if stereo
let mono = if channels > 1 {
let num_frames = frame.len() / channels as usize;
let mut mono = Vec::with_capacity(num_frames);
for i in 0..num_frames {
mono.push(frame[i * channels as usize]);
}
mono
} else {
frame
};
for pkt in decoder.process_samples(&mono) {
let _ = decode_tx.send(DecodedMessage::Aprs(pkt));
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("APRS decoder: dropped {} PCM frames", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
/// Run the CW decoder task. Only processes PCM when rig mode is CW or CWR.
pub async fn run_cw_decoder(
sample_rate: u32,
channels: u16,
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
state_rx: watch::Receiver<RigState>,
decode_tx: broadcast::Sender<DecodedMessage>,
) {
info!("CW decoder started ({}Hz, {} ch)", sample_rate, channels);
let mut decoder = decode::cw::CwDecoder::new(sample_rate);
let mut was_active = false;
loop {
match pcm_rx.recv().await {
Ok(frame) => {
let mode = &state_rx.borrow().status.mode;
let active = matches!(mode, RigMode::CW | RigMode::CWR);
if !active {
if was_active {
decoder.reset();
was_active = false;
}
continue;
}
was_active = true;
// Downmix to mono if stereo
let mono = if channels > 1 {
let num_frames = frame.len() / channels as usize;
let mut mono = Vec::with_capacity(num_frames);
for i in 0..num_frames {
mono.push(frame[i * channels as usize]);
}
mono
} else {
frame
};
for evt in decoder.process_samples(&mono) {
let _ = decode_tx.send(DecodedMessage::Cw(evt));
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("CW decoder: dropped {} PCM frames", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
/// Run the audio TCP listener, accepting client connections. /// Run the audio TCP listener, accepting client connections.
pub async fn run_audio_listener( pub async fn run_audio_listener(
addr: SocketAddr, addr: SocketAddr,
rx_audio: broadcast::Sender<Bytes>, rx_audio: broadcast::Sender<Bytes>,
tx_audio: mpsc::Sender<Bytes>, tx_audio: mpsc::Sender<Bytes>,
stream_info: AudioStreamInfo, stream_info: AudioStreamInfo,
decode_tx: broadcast::Sender<DecodedMessage>,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let listener = TcpListener::bind(addr).await?; let listener = TcpListener::bind(addr).await?;
info!("Audio listener on {}", addr); info!("Audio listener on {}", addr);
@@ -280,9 +391,10 @@ pub async fn run_audio_listener(
let rx_audio = rx_audio.clone(); let rx_audio = rx_audio.clone();
let tx_audio = tx_audio.clone(); let tx_audio = tx_audio.clone();
let info = stream_info.clone(); let info = stream_info.clone();
let decode_tx = decode_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info).await { if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info, decode_tx).await {
warn!("Audio client {} error: {:?}", peer, e); warn!("Audio client {} error: {:?}", peer, e);
} }
info!("Audio client {} disconnected", peer); info!("Audio client {} disconnected", peer);
@@ -296,6 +408,7 @@ async fn handle_audio_client(
rx_audio: broadcast::Sender<Bytes>, rx_audio: broadcast::Sender<Bytes>,
tx_audio: mpsc::Sender<Bytes>, tx_audio: mpsc::Sender<Bytes>,
stream_info: AudioStreamInfo, stream_info: AudioStreamInfo,
decode_tx: broadcast::Sender<DecodedMessage>,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let (reader, writer) = socket.into_split(); let (reader, writer) = socket.into_split();
let mut reader = tokio::io::BufReader::new(reader); let mut reader = tokio::io::BufReader::new(reader);
@@ -306,22 +419,47 @@ async fn handle_audio_client(
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?; write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?;
// Spawn RX forwarding task // Spawn RX + decode forwarding task (shares the writer)
let mut rx_sub = rx_audio.subscribe(); let mut rx_sub = rx_audio.subscribe();
let mut decode_sub = decode_tx.subscribe();
let mut writer_for_rx = writer; let mut writer_for_rx = writer;
let rx_handle = tokio::spawn(async move { let rx_handle = tokio::spawn(async move {
loop { loop {
match rx_sub.recv().await { tokio::select! {
Ok(packet) => { result = rx_sub.recv() => {
if let Err(e) = write_audio_msg(&mut writer_for_rx, AUDIO_MSG_RX_FRAME, &packet).await { match result {
warn!("Audio RX write to {} failed: {}", peer, e); Ok(packet) => {
break; if let Err(e) = write_audio_msg(&mut writer_for_rx, AUDIO_MSG_RX_FRAME, &packet).await {
warn!("Audio RX write to {} failed: {}", peer, e);
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("Audio RX: {} dropped {} frames", peer, n);
}
Err(broadcast::error::RecvError::Closed) => break,
} }
} }
Err(broadcast::error::RecvError::Lagged(n)) => { result = decode_sub.recv() => {
warn!("Audio RX: {} dropped {} frames", peer, n); match result {
Ok(msg) => {
let msg_type = match &msg {
DecodedMessage::Aprs(_) => AUDIO_MSG_APRS_DECODE,
DecodedMessage::Cw(_) => AUDIO_MSG_CW_DECODE,
};
if let Ok(json) = serde_json::to_vec(&msg) {
if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await {
warn!("Audio decode write to {} failed: {}", peer, e);
break;
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("Audio decode: {} dropped {} messages", peer, n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
} }
Err(broadcast::error::RecvError::Closed) => break,
} }
} }
}); });
+592
View File
@@ -0,0 +1,592 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Bell 202 AFSK demodulator + AX.25/APRS decoder.
//!
//! Ported from the browser-side JavaScript implementation.
use trx_core::decode::AprsPacket;
// ---------------------------------------------------------------------------
// CRC-16-CCITT
// ---------------------------------------------------------------------------
const CRC_CCITT_TABLE: [u16; 256] = {
let mut table = [0u16; 256];
let mut i = 0usize;
while i < 256 {
let mut crc = i as u16;
let mut j = 0;
while j < 8 {
if crc & 1 != 0 {
crc = (crc >> 1) ^ 0x8408;
} else {
crc >>= 1;
}
j += 1;
}
table[i] = crc;
i += 1;
}
table
};
fn crc16ccitt(bytes: &[u8]) -> u16 {
let mut crc: u16 = 0xFFFF;
for &b in bytes {
crc = (crc >> 8) ^ CRC_CCITT_TABLE[((crc ^ b as u16) & 0xFF) as usize];
}
crc ^ 0xFFFF
}
// ---------------------------------------------------------------------------
// Correlation demodulator (one instance)
// ---------------------------------------------------------------------------
const BAUD: f32 = 1200.0;
const MARK: f32 = 1200.0;
const SPACE: f32 = 2200.0;
const TWO_PI: f32 = std::f32::consts::TAU;
const PLL_GAIN: f32 = 0.4;
struct Demodulator {
samples_per_bit: f32,
// Energy gate
energy_acc: f32,
energy_count: usize,
energy_window: usize,
// Oscillator phases
mark_phase: f32,
space_phase: f32,
mark_phase_inc: f32,
space_phase_inc: f32,
// Sliding-window correlation filter
corr_len: usize,
mark_i_buf: Vec<f32>,
mark_q_buf: Vec<f32>,
space_i_buf: Vec<f32>,
space_q_buf: Vec<f32>,
corr_idx: usize,
mark_i_sum: f32,
mark_q_sum: f32,
space_i_sum: f32,
space_q_sum: f32,
// Clock recovery
last_bit: u8,
bit_phase: f32,
// NRZI
prev_sampled_bit: u8,
// HDLC
ones: u32,
frame_bits: Vec<u8>,
in_frame: bool,
// Results
frames: Vec<RawFrame>,
}
struct RawFrame {
payload: Vec<u8>,
crc_ok: bool,
}
impl Demodulator {
fn new(sample_rate: u32, window_factor: f32) -> Self {
let sr = sample_rate as f32;
let samples_per_bit = sr / BAUD;
let corr_len = (samples_per_bit * window_factor).round().max(2.0) as usize;
let energy_window = (sr * 0.05).round() as usize;
Self {
samples_per_bit,
energy_acc: 0.0,
energy_count: 0,
energy_window,
mark_phase: 0.0,
space_phase: 0.0,
mark_phase_inc: TWO_PI * MARK / sr,
space_phase_inc: TWO_PI * SPACE / sr,
corr_len,
mark_i_buf: vec![0.0; corr_len],
mark_q_buf: vec![0.0; corr_len],
space_i_buf: vec![0.0; corr_len],
space_q_buf: vec![0.0; corr_len],
corr_idx: 0,
mark_i_sum: 0.0,
mark_q_sum: 0.0,
space_i_sum: 0.0,
space_q_sum: 0.0,
last_bit: 0,
bit_phase: 0.0,
prev_sampled_bit: 0,
ones: 0,
frame_bits: Vec::new(),
in_frame: false,
frames: Vec::new(),
}
}
fn reset_state(&mut self) {
self.mark_phase = 0.0;
self.space_phase = 0.0;
self.mark_i_buf.fill(0.0);
self.mark_q_buf.fill(0.0);
self.space_i_buf.fill(0.0);
self.space_q_buf.fill(0.0);
self.corr_idx = 0;
self.mark_i_sum = 0.0;
self.mark_q_sum = 0.0;
self.space_i_sum = 0.0;
self.space_q_sum = 0.0;
self.last_bit = 0;
self.bit_phase = 0.0;
self.prev_sampled_bit = 0;
self.ones = 0;
self.frame_bits.clear();
self.in_frame = false;
}
fn process_buffer(&mut self, samples: &[f32]) -> Vec<RawFrame> {
for &s in samples {
self.process_sample(s);
}
std::mem::take(&mut self.frames)
}
fn process_sample(&mut self, s: f32) {
// Energy gate
self.energy_acc += s * s;
self.energy_count += 1;
if self.energy_count >= self.energy_window {
let rms = (self.energy_acc / self.energy_count as f32).sqrt();
if rms < 0.001 {
self.reset_state();
}
self.energy_acc = 0.0;
self.energy_count = 0;
}
// Mix with reference oscillators
let m_i = s * self.mark_phase.cos();
let m_q = s * self.mark_phase.sin();
let s_i = s * self.space_phase.cos();
let s_q = s * self.space_phase.sin();
self.mark_phase += self.mark_phase_inc;
self.space_phase += self.space_phase_inc;
if self.mark_phase > TWO_PI {
self.mark_phase -= TWO_PI;
}
if self.space_phase > TWO_PI {
self.space_phase -= TWO_PI;
}
// Sliding-window integration
let idx = self.corr_idx;
self.mark_i_sum += m_i - self.mark_i_buf[idx];
self.mark_q_sum += m_q - self.mark_q_buf[idx];
self.space_i_sum += s_i - self.space_i_buf[idx];
self.space_q_sum += s_q - self.space_q_buf[idx];
self.mark_i_buf[idx] = m_i;
self.mark_q_buf[idx] = m_q;
self.space_i_buf[idx] = s_i;
self.space_q_buf[idx] = s_q;
self.corr_idx = (idx + 1) % self.corr_len;
// Compare mark vs space energy
let mark_energy =
self.mark_i_sum * self.mark_i_sum + self.mark_q_sum * self.mark_q_sum;
let space_energy =
self.space_i_sum * self.space_i_sum + self.space_q_sum * self.space_q_sum;
let bit: u8 = if mark_energy > space_energy { 1 } else { 0 };
// PLL clock recovery
if bit != self.last_bit {
self.last_bit = bit;
let error = self.bit_phase - self.samples_per_bit / 2.0;
self.bit_phase -= PLL_GAIN * error;
}
self.bit_phase -= 1.0;
if self.bit_phase <= 0.0 {
self.bit_phase += self.samples_per_bit;
self.process_bit(bit);
}
}
fn process_bit(&mut self, raw_bit: u8) {
// NRZI decode: no transition = 1, transition = 0
let decoded_bit: u8 = if raw_bit == self.prev_sampled_bit {
1
} else {
0
};
self.prev_sampled_bit = raw_bit;
if decoded_bit == 1 {
self.ones += 1;
return;
}
// decoded_bit == 0
if self.ones >= 7 {
// Abort
self.in_frame = false;
self.frame_bits.clear();
self.ones = 0;
return;
}
if self.ones == 6 {
// Flag
if self.in_frame && self.frame_bits.len() >= 136 {
if let Some(frame) = self.bits_to_bytes() {
self.frames.push(frame);
}
}
self.frame_bits.clear();
self.in_frame = true;
self.ones = 0;
return;
}
if self.ones == 5 {
// Bit stuffing — flush 5 ones, discard stuffed zero
if self.in_frame {
for _ in 0..5 {
self.frame_bits.push(1);
}
}
self.ones = 0;
return;
}
// Normal data
if self.in_frame {
for _ in 0..self.ones {
self.frame_bits.push(1);
}
self.frame_bits.push(0);
}
self.ones = 0;
}
fn bits_to_bytes(&self) -> Option<RawFrame> {
let byte_len = self.frame_bits.len() / 8;
if byte_len < 17 {
return None;
}
let mut bytes = vec![0u8; byte_len];
for i in 0..byte_len {
let mut b: u8 = 0;
for j in 0..8 {
b |= self.frame_bits[i * 8 + j] << j;
}
bytes[i] = b;
}
let payload = &bytes[..byte_len - 2];
let fcs = bytes[byte_len - 2] as u16 | ((bytes[byte_len - 1] as u16) << 8);
let computed = crc16ccitt(payload);
let crc_ok = computed == fcs;
Some(RawFrame {
payload: payload.to_vec(),
crc_ok,
})
}
}
// ---------------------------------------------------------------------------
// AX.25 address decoding
// ---------------------------------------------------------------------------
struct Ax25Address {
call: String,
ssid: u8,
last: bool,
}
fn decode_ax25_address(bytes: &[u8], offset: usize) -> Ax25Address {
let mut call = String::with_capacity(6);
for i in 0..6 {
let ch = bytes[offset + i] >> 1;
if ch > 32 {
call.push(ch as char);
}
}
let call = call.trim_end().to_string();
let ssid = (bytes[offset + 6] >> 1) & 0x0F;
let last = (bytes[offset + 6] & 0x01) == 1;
Ax25Address { call, ssid, last }
}
struct Ax25Frame {
src: Ax25Address,
dest: Ax25Address,
digis: Vec<Ax25Address>,
info: Vec<u8>,
}
fn parse_ax25(frame: &[u8]) -> Option<Ax25Frame> {
if frame.len() < 16 {
return None;
}
let dest = decode_ax25_address(frame, 0);
let src = decode_ax25_address(frame, 7);
let mut offset = 14;
let mut digis = Vec::new();
let mut last_addr = src.last;
while !last_addr && offset + 7 <= frame.len() {
let digi = decode_ax25_address(frame, offset);
last_addr = digi.last;
digis.push(digi);
offset += 7;
}
if offset + 2 > frame.len() {
return None;
}
// Skip control + PID bytes
let info = frame[offset + 2..].to_vec();
Some(Ax25Frame {
src,
dest,
digis,
info,
})
}
// ---------------------------------------------------------------------------
// APRS parser
// ---------------------------------------------------------------------------
fn format_call(addr: &Ax25Address) -> String {
if addr.ssid != 0 {
format!("{}-{}", addr.call, addr.ssid)
} else {
addr.call.clone()
}
}
fn parse_aprs(ax25: &Ax25Frame) -> AprsPacket {
let src_call = format_call(&ax25.src);
let dest_call = format_call(&ax25.dest);
let path = ax25
.digis
.iter()
.map(|d| format_call(d))
.collect::<Vec<_>>()
.join(",");
let info_str = String::from_utf8_lossy(&ax25.info).to_string();
let packet_type = if !info_str.is_empty() {
match info_str.as_bytes()[0] {
b'!' | b'=' | b'/' | b'@' => "Position",
b':' => "Message",
b'>' => "Status",
b'T' => "Telemetry",
b';' => "Object",
b')' => "Item",
b'`' | b'\'' => "Mic-E",
_ => "Unknown",
}
} else {
"Unknown"
};
let mut lat = None;
let mut lon = None;
let mut symbol_table = None;
let mut symbol_code = None;
if packet_type == "Position" {
if let Some(pos) = parse_aprs_position(&info_str) {
lat = Some(pos.0);
lon = Some(pos.1);
symbol_table = Some(pos.2.to_string());
symbol_code = Some(pos.3.to_string());
}
}
AprsPacket {
src_call,
dest_call,
path,
info: info_str,
packet_type: packet_type.to_string(),
crc_ok: false, // set by caller
lat,
lon,
symbol_table,
symbol_code,
}
}
fn parse_aprs_position(info_str: &str) -> Option<(f64, f64, char, char)> {
if info_str.is_empty() {
return None;
}
let bytes = info_str.as_bytes();
let dt = bytes[0];
let pos_str = match dt {
b'!' | b'=' => &info_str[1..],
b'/' | b'@' => {
if info_str.len() < 9 {
return None;
}
&info_str[8..]
}
_ => return None,
};
if pos_str.is_empty() {
return None;
}
let first = pos_str.as_bytes()[0];
if first < b'0' || first > b'9' {
return parse_aprs_compressed(pos_str);
}
// Uncompressed: DDMM.MMN/DDDMM.MMEsYYY
if pos_str.len() < 19 {
return None;
}
let lat_str = &pos_str[..8];
let sym_table = pos_str.as_bytes()[8] as char;
let lon_str = &pos_str[9..18];
let sym_code = pos_str.as_bytes()[18] as char;
let lat = parse_aprs_lat(lat_str)?;
let lon = parse_aprs_lon(lon_str)?;
Some((lat, lon, sym_table, sym_code))
}
fn parse_aprs_compressed(pos_str: &str) -> Option<(f64, f64, char, char)> {
if pos_str.len() < 10 {
return None;
}
let bytes = pos_str.as_bytes();
let sym_table = bytes[0] as char;
let mut lat_val: u32 = 0;
let mut lon_val: u32 = 0;
for i in 0..4 {
let lc = bytes[1 + i] as i32 - 33;
let xc = bytes[5 + i] as i32 - 33;
if lc < 0 || lc > 90 || xc < 0 || xc > 90 {
return None;
}
lat_val = lat_val * 91 + lc as u32;
lon_val = lon_val * 91 + xc as u32;
}
let lat = 90.0 - lat_val as f64 / 380926.0;
let lon = -180.0 + lon_val as f64 / 190463.0;
if !(-90.0..=90.0).contains(&lat) || !(-180.0..=180.0).contains(&lon) {
return None;
}
let sym_code = bytes[9] as char;
let lat = (lat * 1e6).round() / 1e6;
let lon = (lon * 1e6).round() / 1e6;
Some((lat, lon, sym_table, sym_code))
}
fn parse_aprs_lat(s: &str) -> Option<f64> {
if s.len() < 8 {
return None;
}
let deg: f64 = s[..2].parse().ok()?;
let min: f64 = s[2..7].parse().ok()?;
let ns = s.as_bytes()[7];
let mut lat = deg + min / 60.0;
match ns {
b'S' | b's' => lat = -lat,
b'N' | b'n' => {}
_ => return None,
}
Some((lat * 1e6).round() / 1e6)
}
fn parse_aprs_lon(s: &str) -> Option<f64> {
if s.len() < 9 {
return None;
}
let deg: f64 = s[..3].parse().ok()?;
let min: f64 = s[3..8].parse().ok()?;
let ew = s.as_bytes()[8];
let mut lon = deg + min / 60.0;
match ew {
b'W' | b'w' => lon = -lon,
b'E' | b'e' => {}
_ => return None,
}
Some((lon * 1e6).round() / 1e6)
}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
pub struct AprsDecoder {
demodulators: Vec<Demodulator>,
}
impl AprsDecoder {
pub fn new(sample_rate: u32) -> Self {
Self {
demodulators: vec![
Demodulator::new(sample_rate, 1.0),
Demodulator::new(sample_rate, 0.5),
],
}
}
pub fn process_samples(&mut self, samples: &[f32]) -> Vec<AprsPacket> {
let mut seen = std::collections::HashSet::new();
let mut results = Vec::new();
for demod in &mut self.demodulators {
for frame in demod.process_buffer(samples) {
// Dedup by address prefix + payload length
let key_len = frame.payload.len().min(14);
let mut key = Vec::with_capacity(key_len + 4);
key.extend_from_slice(&frame.payload[..key_len]);
key.extend_from_slice(&(frame.payload.len() as u32).to_le_bytes());
if !seen.insert(key) {
continue;
}
if let Some(ax25) = parse_ax25(&frame.payload) {
let mut pkt = parse_aprs(&ax25);
pkt.crc_ok = frame.crc_ok;
results.push(pkt);
}
}
}
results
}
pub fn reset(&mut self) {
for demod in &mut self.demodulators {
demod.reset_state();
demod.energy_acc = 0.0;
demod.energy_count = 0;
demod.frames.clear();
}
}
}
+405
View File
@@ -0,0 +1,405 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! Goertzel-based CW (Morse code) decoder.
//!
//! Ported from the browser-side JavaScript implementation.
use trx_core::decode::CwEvent;
// ITU Morse code lookup
fn morse_lookup(code: &str) -> Option<char> {
match code {
".-" => Some('A'),
"-..." => Some('B'),
"-.-." => Some('C'),
"-.." => Some('D'),
"." => Some('E'),
"..-." => Some('F'),
"--." => Some('G'),
"...." => Some('H'),
".." => Some('I'),
".---" => Some('J'),
"-.-" => Some('K'),
".-.." => Some('L'),
"--" => Some('M'),
"-." => Some('N'),
"---" => Some('O'),
".--." => Some('P'),
"--.-" => Some('Q'),
".-." => Some('R'),
"..." => Some('S'),
"-" => Some('T'),
"..-" => Some('U'),
"...-" => Some('V'),
".--" => Some('W'),
"-..-" => Some('X'),
"-.--" => Some('Y'),
"--.." => Some('Z'),
"-----" => Some('0'),
".----" => Some('1'),
"..---" => Some('2'),
"...--" => Some('3'),
"....-" => Some('4'),
"....." => Some('5'),
"-...." => Some('6'),
"--..." => Some('7'),
"---.." => Some('8'),
"----." => Some('9'),
".-.-.-" => Some('.'),
"--..--" => Some(','),
"..--.." => Some('?'),
".----." => Some('\''),
"-.-.--" => Some('!'),
"-..-." => Some('/'),
"-.--." => Some('('),
"-.--.-" => Some(')'),
".-..." => Some('&'),
"---..." => Some(':'),
"-.-.-." => Some(';'),
"-...-" => Some('='),
".-.-." => Some('+'),
"-....-" => Some('-'),
"..--.-" => Some('_'),
".-..-." => Some('"'),
"...-..-" => Some('$'),
".--.-." => Some('@'),
_ => None,
}
}
// ---------------------------------------------------------------------------
// Goertzel detector
// ---------------------------------------------------------------------------
fn goertzel_energy(buf: &[f32], coeff: f32) -> f32 {
let mut s1: f32 = 0.0;
let mut s2: f32 = 0.0;
for &sample in buf {
let s0 = coeff * s1 - s2 + sample;
s2 = s1;
s1 = s0;
}
let n2 = (buf.len() * buf.len()) as f32;
(s1 * s1 + s2 * s2 - coeff * s1 * s2) / n2
}
// ---------------------------------------------------------------------------
// Tone scan bins
// ---------------------------------------------------------------------------
const TONE_SCAN_LOW: u32 = 300;
const TONE_SCAN_HIGH: u32 = 1200;
const TONE_SCAN_STEP: u32 = 25;
const TONE_STABLE_NEEDED: u32 = 3;
const THRESHOLD: f32 = 0.05;
struct ToneScanBin {
freq: u32,
coeff: f32,
}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
pub struct CwDecoder {
sample_rate: u32,
window_size: usize,
sample_buf: Vec<f32>,
sample_idx: usize,
// Goertzel parameters
tone_freq: u32,
coeff: f32,
// Tone state
tone_on: bool,
tone_on_at: f64,
tone_off_at: f64,
current_symbol: String,
sample_counter: u64,
// WPM
wpm: u32,
// Auto tone detection
tone_scan_bins: Vec<ToneScanBin>,
tone_stable_bin: i32,
tone_stable_count: u32,
// Auto WPM detection
on_durations: Vec<f64>,
// Results
events: Vec<CwEvent>,
}
impl CwDecoder {
pub fn new(sample_rate: u32) -> Self {
let window_ms = 50;
let window_size = (sample_rate as usize * window_ms) / 1000;
let default_tone = 700u32;
let k = (default_tone as f32 * window_size as f32 / sample_rate as f32).round();
let omega = (2.0 * std::f32::consts::PI * k) / window_size as f32;
let coeff = 2.0 * omega.cos();
// Build scan bins
let mut tone_scan_bins = Vec::new();
let mut f = TONE_SCAN_LOW;
while f <= TONE_SCAN_HIGH {
let bk =
(f as f32 * window_size as f32 / sample_rate as f32).round();
let b_omega = (2.0 * std::f32::consts::PI * bk) / window_size as f32;
tone_scan_bins.push(ToneScanBin {
freq: f,
coeff: 2.0 * b_omega.cos(),
});
f += TONE_SCAN_STEP;
}
Self {
sample_rate,
window_size,
sample_buf: vec![0.0f32; window_size],
sample_idx: 0,
tone_freq: default_tone,
coeff,
tone_on: false,
tone_on_at: 0.0,
tone_off_at: 0.0,
current_symbol: String::new(),
sample_counter: 0,
wpm: 15,
tone_scan_bins,
tone_stable_bin: -1,
tone_stable_count: 0,
on_durations: Vec::new(),
events: Vec::new(),
}
}
fn recompute_goertzel(&mut self, new_freq: u32) {
self.tone_freq = new_freq;
let k = (new_freq as f32 * self.window_size as f32 / self.sample_rate as f32)
.round();
let omega = (2.0 * std::f32::consts::PI * k) / self.window_size as f32;
self.coeff = 2.0 * omega.cos();
}
fn unit_ms(&self) -> f64 {
1200.0 / self.wpm as f64
}
fn now_ms(&self) -> f64 {
self.sample_counter as f64 * 1000.0 / self.sample_rate as f64
}
fn goertzel_detect(&self) -> bool {
let tone_energy = goertzel_energy(&self.sample_buf, self.coeff);
let mut total_energy: f32 = 0.0;
for &s in &self.sample_buf {
total_energy += s * s;
}
let avg_energy = total_energy / self.sample_buf.len() as f32;
if avg_energy < 1e-10 {
return false;
}
(tone_energy / avg_energy) > THRESHOLD
}
fn auto_detect_tone(&mut self) {
let mut total_energy: f32 = 0.0;
for &s in &self.sample_buf {
total_energy += s * s;
}
let avg_energy = total_energy / self.sample_buf.len() as f32;
if avg_energy < 1e-10 {
return;
}
let mut best_idx: i32 = -1;
let mut best_ratio: f32 = 0.0;
for (i, bin) in self.tone_scan_bins.iter().enumerate() {
let e = goertzel_energy(&self.sample_buf, bin.coeff);
let ratio = e / avg_energy;
if ratio > best_ratio {
best_ratio = ratio;
best_idx = i as i32;
}
}
if best_ratio < THRESHOLD || best_idx < 0 {
self.tone_stable_count = 0;
self.tone_stable_bin = -1;
return;
}
if self.tone_stable_bin >= 0
&& (best_idx - self.tone_stable_bin).unsigned_abs() <= 1
{
self.tone_stable_count += 1;
} else {
self.tone_stable_bin = best_idx;
self.tone_stable_count = 1;
}
if self.tone_stable_count >= TONE_STABLE_NEEDED {
let detected_freq = self.tone_scan_bins[self.tone_stable_bin as usize].freq;
if (detected_freq as i32 - self.tone_freq as i32).unsigned_abs()
> TONE_SCAN_STEP
{
self.recompute_goertzel(detected_freq);
}
}
}
fn auto_detect_wpm(&mut self) {
if self.on_durations.len() < 8 {
return;
}
let mut sorted: Vec<f64> = self.on_durations.clone();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let mut best_boundary = 1usize;
let mut best_score = f64::INFINITY;
for i in 1..sorted.len() {
let c1 = &sorted[..i];
let c2 = &sorted[i..];
let mean1: f64 = c1.iter().sum::<f64>() / c1.len() as f64;
let mean2: f64 = c2.iter().sum::<f64>() / c2.len() as f64;
let mut score: f64 = 0.0;
for &v in c1 {
score += (v - mean1) * (v - mean1);
}
for &v in c2 {
score += (v - mean2) * (v - mean2);
}
if score < best_score {
best_score = score;
best_boundary = i;
}
}
let dit_cluster = &sorted[..best_boundary];
if dit_cluster.is_empty() {
return;
}
let dit_ms = dit_cluster[dit_cluster.len() / 2];
if dit_ms < 10.0 {
return;
}
let new_wpm = (1200.0 / dit_ms).round() as u32;
let new_wpm = new_wpm.clamp(5, 40);
if new_wpm != self.wpm {
self.wpm = new_wpm;
}
}
fn process_window(&mut self) {
// Auto tone detection
self.auto_detect_tone();
let detected = self.goertzel_detect();
let now = self.now_ms();
// Emit signal state event on transitions
if detected && !self.tone_on {
// Tone just turned on
self.tone_on = true;
let off_duration = now - self.tone_off_at;
if self.tone_off_at > 0.0 {
let u = self.unit_ms();
if off_duration > u * 5.0 {
// Word gap
if !self.current_symbol.is_empty() {
let ch = morse_lookup(&self.current_symbol)
.unwrap_or('?');
self.emit_text(&ch.to_string());
self.current_symbol.clear();
}
self.emit_text(" ");
} else if off_duration > u * 2.0 {
// Character gap
if !self.current_symbol.is_empty() {
let ch = morse_lookup(&self.current_symbol)
.unwrap_or('?');
self.emit_text(&ch.to_string());
self.current_symbol.clear();
}
}
}
self.tone_on_at = now;
} else if !detected && self.tone_on {
// Tone just turned off
self.tone_on = false;
let on_duration = now - self.tone_on_at;
let u = self.unit_ms();
if on_duration > u * 2.0 {
self.current_symbol.push('-');
} else {
self.current_symbol.push('.');
}
self.tone_off_at = now;
// Collect for auto WPM
self.on_durations.push(on_duration);
if self.on_durations.len() > 30 {
self.on_durations.remove(0);
}
self.auto_detect_wpm();
}
// Flush pending character after long silence
if !self.tone_on && !self.current_symbol.is_empty() && self.tone_off_at > 0.0 {
let silence = now - self.tone_off_at;
if silence > self.unit_ms() * 5.0 {
let ch = morse_lookup(&self.current_symbol).unwrap_or('?');
self.emit_text(&ch.to_string());
self.current_symbol.clear();
}
}
}
fn emit_text(&mut self, text: &str) {
self.events.push(CwEvent {
text: text.to_string(),
wpm: self.wpm,
tone_hz: self.tone_freq,
signal_on: self.tone_on,
});
}
pub fn process_samples(&mut self, samples: &[f32]) -> Vec<CwEvent> {
for &s in samples {
self.sample_buf[self.sample_idx] = s;
self.sample_idx += 1;
self.sample_counter += 1;
if self.sample_idx >= self.window_size {
self.process_window();
self.sample_idx = 0;
}
}
std::mem::take(&mut self.events)
}
pub fn reset(&mut self) {
self.sample_buf.fill(0.0);
self.sample_idx = 0;
self.tone_on = false;
self.tone_on_at = 0.0;
self.tone_off_at = 0.0;
self.current_symbol.clear();
self.sample_counter = 0;
self.wpm = 15;
self.tone_freq = 700;
self.recompute_goertzel(700);
self.tone_stable_bin = -1;
self.tone_stable_count = 0;
self.on_durations.clear();
self.events.clear();
}
}
+6
View File
@@ -0,0 +1,6 @@
// SPDX-FileCopyrightText: 2025 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
pub mod aprs;
pub mod cw;
+28 -2
View File
@@ -4,6 +4,7 @@
mod audio; mod audio;
mod config; mod config;
mod decode;
mod error; mod error;
mod listener; mod listener;
mod plugins; mod plugins;
@@ -336,8 +337,33 @@ async fn main() -> DynResult<()> {
let (rx_audio_tx, _) = broadcast::channel::<Bytes>(256); let (rx_audio_tx, _) = broadcast::channel::<Bytes>(256);
let (tx_audio_tx, tx_audio_rx) = mpsc::channel::<Bytes>(64); let (tx_audio_tx, tx_audio_rx) = mpsc::channel::<Bytes>(64);
// PCM tap for server-side decoders
let (pcm_tx, _) = broadcast::channel::<Vec<f32>>(64);
// Decoded messages broadcast
let (decode_tx, _) = broadcast::channel::<trx_core::decode::DecodedMessage>(256);
if cfg.audio.rx_enabled { if cfg.audio.rx_enabled {
let _capture_thread = audio::spawn_audio_capture(&cfg.audio, rx_audio_tx.clone()); let _capture_thread = audio::spawn_audio_capture(&cfg.audio, rx_audio_tx.clone(), Some(pcm_tx.clone()));
// Spawn APRS decoder task
let aprs_pcm_rx = pcm_tx.subscribe();
let aprs_state_rx = _state_rx.clone();
let aprs_decode_tx = decode_tx.clone();
let aprs_sr = cfg.audio.sample_rate;
let aprs_ch = cfg.audio.channels;
tokio::spawn(audio::run_aprs_decoder(
aprs_sr, aprs_ch as u16, aprs_pcm_rx, aprs_state_rx, aprs_decode_tx,
));
// Spawn CW decoder task
let cw_pcm_rx = pcm_tx.subscribe();
let cw_state_rx = _state_rx.clone();
let cw_decode_tx = decode_tx.clone();
let cw_sr = cfg.audio.sample_rate;
let cw_ch = cfg.audio.channels;
tokio::spawn(audio::run_cw_decoder(
cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx,
));
} }
if cfg.audio.tx_enabled { if cfg.audio.tx_enabled {
let _playback_thread = audio::spawn_audio_playback(&cfg.audio, tx_audio_rx); let _playback_thread = audio::spawn_audio_playback(&cfg.audio, tx_audio_rx);
@@ -345,7 +371,7 @@ async fn main() -> DynResult<()> {
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = if let Err(e) =
audio::run_audio_listener(audio_listen, rx_audio_tx, tx_audio_tx, stream_info) audio::run_audio_listener(audio_listen, rx_audio_tx, tx_audio_tx, stream_info, decode_tx)
.await .await
{ {
error!("Audio listener error: {:?}", e); error!("Audio listener error: {:?}", e);