diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 530130f..b9918b9 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -8,15 +8,18 @@ use std::net::SocketAddr; use bytes::Bytes; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, watch}; use tracing::{error, info, warn}; use trx_core::audio::{ - read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, - AUDIO_MSG_TX_FRAME, + read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_APRS_DECODE, + AUDIO_MSG_CW_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, }; +use trx_core::decode::DecodedMessage; +use trx_core::rig::state::{RigMode, RigState}; use crate::config::AudioConfig; +use crate::decode; /// Spawn the audio capture thread. /// @@ -26,6 +29,7 @@ use crate::config::AudioConfig; pub fn spawn_audio_capture( cfg: &AudioConfig, tx: broadcast::Sender, + pcm_tx: Option>>, ) -> std::thread::JoinHandle<()> { let sample_rate = cfg.sample_rate; let channels = cfg.channels as u16; @@ -35,7 +39,7 @@ pub fn spawn_audio_capture( std::thread::spawn(move || { if let Err(e) = - run_capture(sample_rate, channels, frame_duration_ms, bitrate_bps, device_name, tx) + run_capture(sample_rate, channels, frame_duration_ms, bitrate_bps, device_name, tx, pcm_tx) { error!("Audio capture thread error: {}", e); } @@ -49,6 +53,7 @@ fn run_capture( bitrate_bps: u32, device_name: Option, tx: broadcast::Sender, + pcm_tx: Option>>, ) -> Result<(), Box> { use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; @@ -130,6 +135,9 @@ fn run_capture( pcm_buf.extend_from_slice(&samples); while pcm_buf.len() >= frame_samples { let frame: Vec = pcm_buf.drain(..frame_samples).collect(); + if let Some(ref pcm_tx) = pcm_tx { + let _ = pcm_tx.send(frame.clone()); + } match encoder.encode_float(&frame, &mut opus_buf) { Ok(len) => { let packet = Bytes::copy_from_slice(&opus_buf[..len]); @@ -263,12 +271,115 @@ fn run_playback( Ok(()) } +/// Run the APRS decoder task. Only processes PCM when rig mode is PKT. +pub async fn run_aprs_decoder( + sample_rate: u32, + channels: u16, + mut pcm_rx: broadcast::Receiver>, + state_rx: watch::Receiver, + decode_tx: broadcast::Sender, +) { + info!("APRS decoder started ({}Hz, {} ch)", sample_rate, channels); + let mut decoder = decode::aprs::AprsDecoder::new(sample_rate); + let mut was_active = false; + + loop { + match pcm_rx.recv().await { + Ok(frame) => { + let mode = &state_rx.borrow().status.mode; + let active = matches!(mode, RigMode::PKT); + + if !active { + if was_active { + decoder.reset(); + was_active = false; + } + continue; + } + was_active = true; + + // Downmix to mono if stereo + let mono = if channels > 1 { + let num_frames = frame.len() / channels as usize; + let mut mono = Vec::with_capacity(num_frames); + for i in 0..num_frames { + mono.push(frame[i * channels as usize]); + } + mono + } else { + frame + }; + + for pkt in decoder.process_samples(&mono) { + let _ = decode_tx.send(DecodedMessage::Aprs(pkt)); + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("APRS decoder: dropped {} PCM frames", n); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } +} + +/// Run the CW decoder task. Only processes PCM when rig mode is CW or CWR. +pub async fn run_cw_decoder( + sample_rate: u32, + channels: u16, + mut pcm_rx: broadcast::Receiver>, + state_rx: watch::Receiver, + decode_tx: broadcast::Sender, +) { + info!("CW decoder started ({}Hz, {} ch)", sample_rate, channels); + let mut decoder = decode::cw::CwDecoder::new(sample_rate); + let mut was_active = false; + + loop { + match pcm_rx.recv().await { + Ok(frame) => { + let mode = &state_rx.borrow().status.mode; + let active = matches!(mode, RigMode::CW | RigMode::CWR); + + if !active { + if was_active { + decoder.reset(); + was_active = false; + } + continue; + } + was_active = true; + + // Downmix to mono if stereo + let mono = if channels > 1 { + let num_frames = frame.len() / channels as usize; + let mut mono = Vec::with_capacity(num_frames); + for i in 0..num_frames { + mono.push(frame[i * channels as usize]); + } + mono + } else { + frame + }; + + for evt in decoder.process_samples(&mono) { + let _ = decode_tx.send(DecodedMessage::Cw(evt)); + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("CW decoder: dropped {} PCM frames", n); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } +} + /// Run the audio TCP listener, accepting client connections. pub async fn run_audio_listener( addr: SocketAddr, rx_audio: broadcast::Sender, tx_audio: mpsc::Sender, stream_info: AudioStreamInfo, + decode_tx: broadcast::Sender, ) -> std::io::Result<()> { let listener = TcpListener::bind(addr).await?; info!("Audio listener on {}", addr); @@ -280,9 +391,10 @@ pub async fn run_audio_listener( let rx_audio = rx_audio.clone(); let tx_audio = tx_audio.clone(); let info = stream_info.clone(); + let decode_tx = decode_tx.clone(); tokio::spawn(async move { - if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info).await { + if let Err(e) = handle_audio_client(socket, peer, rx_audio, tx_audio, info, decode_tx).await { warn!("Audio client {} error: {:?}", peer, e); } info!("Audio client {} disconnected", peer); @@ -296,6 +408,7 @@ async fn handle_audio_client( rx_audio: broadcast::Sender, tx_audio: mpsc::Sender, stream_info: AudioStreamInfo, + decode_tx: broadcast::Sender, ) -> std::io::Result<()> { let (reader, writer) = socket.into_split(); let mut reader = tokio::io::BufReader::new(reader); @@ -306,22 +419,47 @@ async fn handle_audio_client( .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; write_audio_msg(&mut writer, AUDIO_MSG_STREAM_INFO, &info_json).await?; - // Spawn RX forwarding task + // Spawn RX + decode forwarding task (shares the writer) let mut rx_sub = rx_audio.subscribe(); + let mut decode_sub = decode_tx.subscribe(); let mut writer_for_rx = writer; let rx_handle = tokio::spawn(async move { loop { - match rx_sub.recv().await { - Ok(packet) => { - if let Err(e) = write_audio_msg(&mut writer_for_rx, AUDIO_MSG_RX_FRAME, &packet).await { - warn!("Audio RX write to {} failed: {}", peer, e); - break; + tokio::select! { + result = rx_sub.recv() => { + match result { + Ok(packet) => { + if let Err(e) = write_audio_msg(&mut writer_for_rx, AUDIO_MSG_RX_FRAME, &packet).await { + warn!("Audio RX write to {} failed: {}", peer, e); + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("Audio RX: {} dropped {} frames", peer, n); + } + Err(broadcast::error::RecvError::Closed) => break, } } - Err(broadcast::error::RecvError::Lagged(n)) => { - warn!("Audio RX: {} dropped {} frames", peer, n); + result = decode_sub.recv() => { + match result { + Ok(msg) => { + let msg_type = match &msg { + DecodedMessage::Aprs(_) => AUDIO_MSG_APRS_DECODE, + DecodedMessage::Cw(_) => AUDIO_MSG_CW_DECODE, + }; + if let Ok(json) = serde_json::to_vec(&msg) { + if let Err(e) = write_audio_msg(&mut writer_for_rx, msg_type, &json).await { + warn!("Audio decode write to {} failed: {}", peer, e); + break; + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("Audio decode: {} dropped {} messages", peer, n); + } + Err(broadcast::error::RecvError::Closed) => break, + } } - Err(broadcast::error::RecvError::Closed) => break, } } }); diff --git a/src/trx-server/src/decode/aprs.rs b/src/trx-server/src/decode/aprs.rs new file mode 100644 index 0000000..58b485e --- /dev/null +++ b/src/trx-server/src/decode/aprs.rs @@ -0,0 +1,592 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Bell 202 AFSK demodulator + AX.25/APRS decoder. +//! +//! Ported from the browser-side JavaScript implementation. + +use trx_core::decode::AprsPacket; + +// --------------------------------------------------------------------------- +// CRC-16-CCITT +// --------------------------------------------------------------------------- + +const CRC_CCITT_TABLE: [u16; 256] = { + let mut table = [0u16; 256]; + let mut i = 0usize; + while i < 256 { + let mut crc = i as u16; + let mut j = 0; + while j < 8 { + if crc & 1 != 0 { + crc = (crc >> 1) ^ 0x8408; + } else { + crc >>= 1; + } + j += 1; + } + table[i] = crc; + i += 1; + } + table +}; + +fn crc16ccitt(bytes: &[u8]) -> u16 { + let mut crc: u16 = 0xFFFF; + for &b in bytes { + crc = (crc >> 8) ^ CRC_CCITT_TABLE[((crc ^ b as u16) & 0xFF) as usize]; + } + crc ^ 0xFFFF +} + +// --------------------------------------------------------------------------- +// Correlation demodulator (one instance) +// --------------------------------------------------------------------------- + +const BAUD: f32 = 1200.0; +const MARK: f32 = 1200.0; +const SPACE: f32 = 2200.0; +const TWO_PI: f32 = std::f32::consts::TAU; +const PLL_GAIN: f32 = 0.4; + +struct Demodulator { + samples_per_bit: f32, + + // Energy gate + energy_acc: f32, + energy_count: usize, + energy_window: usize, + + // Oscillator phases + mark_phase: f32, + space_phase: f32, + mark_phase_inc: f32, + space_phase_inc: f32, + + // Sliding-window correlation filter + corr_len: usize, + mark_i_buf: Vec, + mark_q_buf: Vec, + space_i_buf: Vec, + space_q_buf: Vec, + corr_idx: usize, + mark_i_sum: f32, + mark_q_sum: f32, + space_i_sum: f32, + space_q_sum: f32, + + // Clock recovery + last_bit: u8, + bit_phase: f32, + + // NRZI + prev_sampled_bit: u8, + + // HDLC + ones: u32, + frame_bits: Vec, + in_frame: bool, + + // Results + frames: Vec, +} + +struct RawFrame { + payload: Vec, + crc_ok: bool, +} + +impl Demodulator { + fn new(sample_rate: u32, window_factor: f32) -> Self { + let sr = sample_rate as f32; + let samples_per_bit = sr / BAUD; + let corr_len = (samples_per_bit * window_factor).round().max(2.0) as usize; + let energy_window = (sr * 0.05).round() as usize; + + Self { + samples_per_bit, + energy_acc: 0.0, + energy_count: 0, + energy_window, + mark_phase: 0.0, + space_phase: 0.0, + mark_phase_inc: TWO_PI * MARK / sr, + space_phase_inc: TWO_PI * SPACE / sr, + corr_len, + mark_i_buf: vec![0.0; corr_len], + mark_q_buf: vec![0.0; corr_len], + space_i_buf: vec![0.0; corr_len], + space_q_buf: vec![0.0; corr_len], + corr_idx: 0, + mark_i_sum: 0.0, + mark_q_sum: 0.0, + space_i_sum: 0.0, + space_q_sum: 0.0, + last_bit: 0, + bit_phase: 0.0, + prev_sampled_bit: 0, + ones: 0, + frame_bits: Vec::new(), + in_frame: false, + frames: Vec::new(), + } + } + + fn reset_state(&mut self) { + self.mark_phase = 0.0; + self.space_phase = 0.0; + self.mark_i_buf.fill(0.0); + self.mark_q_buf.fill(0.0); + self.space_i_buf.fill(0.0); + self.space_q_buf.fill(0.0); + self.corr_idx = 0; + self.mark_i_sum = 0.0; + self.mark_q_sum = 0.0; + self.space_i_sum = 0.0; + self.space_q_sum = 0.0; + self.last_bit = 0; + self.bit_phase = 0.0; + self.prev_sampled_bit = 0; + self.ones = 0; + self.frame_bits.clear(); + self.in_frame = false; + } + + fn process_buffer(&mut self, samples: &[f32]) -> Vec { + for &s in samples { + self.process_sample(s); + } + std::mem::take(&mut self.frames) + } + + fn process_sample(&mut self, s: f32) { + // Energy gate + self.energy_acc += s * s; + self.energy_count += 1; + if self.energy_count >= self.energy_window { + let rms = (self.energy_acc / self.energy_count as f32).sqrt(); + if rms < 0.001 { + self.reset_state(); + } + self.energy_acc = 0.0; + self.energy_count = 0; + } + + // Mix with reference oscillators + let m_i = s * self.mark_phase.cos(); + let m_q = s * self.mark_phase.sin(); + let s_i = s * self.space_phase.cos(); + let s_q = s * self.space_phase.sin(); + self.mark_phase += self.mark_phase_inc; + self.space_phase += self.space_phase_inc; + if self.mark_phase > TWO_PI { + self.mark_phase -= TWO_PI; + } + if self.space_phase > TWO_PI { + self.space_phase -= TWO_PI; + } + + // Sliding-window integration + let idx = self.corr_idx; + self.mark_i_sum += m_i - self.mark_i_buf[idx]; + self.mark_q_sum += m_q - self.mark_q_buf[idx]; + self.space_i_sum += s_i - self.space_i_buf[idx]; + self.space_q_sum += s_q - self.space_q_buf[idx]; + self.mark_i_buf[idx] = m_i; + self.mark_q_buf[idx] = m_q; + self.space_i_buf[idx] = s_i; + self.space_q_buf[idx] = s_q; + self.corr_idx = (idx + 1) % self.corr_len; + + // Compare mark vs space energy + let mark_energy = + self.mark_i_sum * self.mark_i_sum + self.mark_q_sum * self.mark_q_sum; + let space_energy = + self.space_i_sum * self.space_i_sum + self.space_q_sum * self.space_q_sum; + let bit: u8 = if mark_energy > space_energy { 1 } else { 0 }; + + // PLL clock recovery + if bit != self.last_bit { + self.last_bit = bit; + let error = self.bit_phase - self.samples_per_bit / 2.0; + self.bit_phase -= PLL_GAIN * error; + } + + self.bit_phase -= 1.0; + if self.bit_phase <= 0.0 { + self.bit_phase += self.samples_per_bit; + self.process_bit(bit); + } + } + + fn process_bit(&mut self, raw_bit: u8) { + // NRZI decode: no transition = 1, transition = 0 + let decoded_bit: u8 = if raw_bit == self.prev_sampled_bit { + 1 + } else { + 0 + }; + self.prev_sampled_bit = raw_bit; + + if decoded_bit == 1 { + self.ones += 1; + return; + } + + // decoded_bit == 0 + if self.ones >= 7 { + // Abort + self.in_frame = false; + self.frame_bits.clear(); + self.ones = 0; + return; + } + if self.ones == 6 { + // Flag + if self.in_frame && self.frame_bits.len() >= 136 { + if let Some(frame) = self.bits_to_bytes() { + self.frames.push(frame); + } + } + self.frame_bits.clear(); + self.in_frame = true; + self.ones = 0; + return; + } + if self.ones == 5 { + // Bit stuffing — flush 5 ones, discard stuffed zero + if self.in_frame { + for _ in 0..5 { + self.frame_bits.push(1); + } + } + self.ones = 0; + return; + } + + // Normal data + if self.in_frame { + for _ in 0..self.ones { + self.frame_bits.push(1); + } + self.frame_bits.push(0); + } + self.ones = 0; + } + + fn bits_to_bytes(&self) -> Option { + let byte_len = self.frame_bits.len() / 8; + if byte_len < 17 { + return None; + } + let mut bytes = vec![0u8; byte_len]; + for i in 0..byte_len { + let mut b: u8 = 0; + for j in 0..8 { + b |= self.frame_bits[i * 8 + j] << j; + } + bytes[i] = b; + } + + let payload = &bytes[..byte_len - 2]; + let fcs = bytes[byte_len - 2] as u16 | ((bytes[byte_len - 1] as u16) << 8); + let computed = crc16ccitt(payload); + let crc_ok = computed == fcs; + + Some(RawFrame { + payload: payload.to_vec(), + crc_ok, + }) + } +} + +// --------------------------------------------------------------------------- +// AX.25 address decoding +// --------------------------------------------------------------------------- + +struct Ax25Address { + call: String, + ssid: u8, + last: bool, +} + +fn decode_ax25_address(bytes: &[u8], offset: usize) -> Ax25Address { + let mut call = String::with_capacity(6); + for i in 0..6 { + let ch = bytes[offset + i] >> 1; + if ch > 32 { + call.push(ch as char); + } + } + let call = call.trim_end().to_string(); + let ssid = (bytes[offset + 6] >> 1) & 0x0F; + let last = (bytes[offset + 6] & 0x01) == 1; + Ax25Address { call, ssid, last } +} + +struct Ax25Frame { + src: Ax25Address, + dest: Ax25Address, + digis: Vec, + info: Vec, +} + +fn parse_ax25(frame: &[u8]) -> Option { + if frame.len() < 16 { + return None; + } + let dest = decode_ax25_address(frame, 0); + let src = decode_ax25_address(frame, 7); + + let mut offset = 14; + let mut digis = Vec::new(); + let mut last_addr = src.last; + while !last_addr && offset + 7 <= frame.len() { + let digi = decode_ax25_address(frame, offset); + last_addr = digi.last; + digis.push(digi); + offset += 7; + } + + if offset + 2 > frame.len() { + return None; + } + // Skip control + PID bytes + let info = frame[offset + 2..].to_vec(); + + Some(Ax25Frame { + src, + dest, + digis, + info, + }) +} + +// --------------------------------------------------------------------------- +// APRS parser +// --------------------------------------------------------------------------- + +fn format_call(addr: &Ax25Address) -> String { + if addr.ssid != 0 { + format!("{}-{}", addr.call, addr.ssid) + } else { + addr.call.clone() + } +} + +fn parse_aprs(ax25: &Ax25Frame) -> AprsPacket { + let src_call = format_call(&ax25.src); + let dest_call = format_call(&ax25.dest); + let path = ax25 + .digis + .iter() + .map(|d| format_call(d)) + .collect::>() + .join(","); + let info_str = String::from_utf8_lossy(&ax25.info).to_string(); + + let packet_type = if !info_str.is_empty() { + match info_str.as_bytes()[0] { + b'!' | b'=' | b'/' | b'@' => "Position", + b':' => "Message", + b'>' => "Status", + b'T' => "Telemetry", + b';' => "Object", + b')' => "Item", + b'`' | b'\'' => "Mic-E", + _ => "Unknown", + } + } else { + "Unknown" + }; + + let mut lat = None; + let mut lon = None; + let mut symbol_table = None; + let mut symbol_code = None; + + if packet_type == "Position" { + if let Some(pos) = parse_aprs_position(&info_str) { + lat = Some(pos.0); + lon = Some(pos.1); + symbol_table = Some(pos.2.to_string()); + symbol_code = Some(pos.3.to_string()); + } + } + + AprsPacket { + src_call, + dest_call, + path, + info: info_str, + packet_type: packet_type.to_string(), + crc_ok: false, // set by caller + lat, + lon, + symbol_table, + symbol_code, + } +} + +fn parse_aprs_position(info_str: &str) -> Option<(f64, f64, char, char)> { + if info_str.is_empty() { + return None; + } + let bytes = info_str.as_bytes(); + let dt = bytes[0]; + + let pos_str = match dt { + b'!' | b'=' => &info_str[1..], + b'/' | b'@' => { + if info_str.len() < 9 { + return None; + } + &info_str[8..] + } + _ => return None, + }; + + if pos_str.is_empty() { + return None; + } + + let first = pos_str.as_bytes()[0]; + if first < b'0' || first > b'9' { + return parse_aprs_compressed(pos_str); + } + + // Uncompressed: DDMM.MMN/DDDMM.MMEsYYY + if pos_str.len() < 19 { + return None; + } + + let lat_str = &pos_str[..8]; + let sym_table = pos_str.as_bytes()[8] as char; + let lon_str = &pos_str[9..18]; + let sym_code = pos_str.as_bytes()[18] as char; + + let lat = parse_aprs_lat(lat_str)?; + let lon = parse_aprs_lon(lon_str)?; + + Some((lat, lon, sym_table, sym_code)) +} + +fn parse_aprs_compressed(pos_str: &str) -> Option<(f64, f64, char, char)> { + if pos_str.len() < 10 { + return None; + } + let bytes = pos_str.as_bytes(); + let sym_table = bytes[0] as char; + + let mut lat_val: u32 = 0; + let mut lon_val: u32 = 0; + for i in 0..4 { + let lc = bytes[1 + i] as i32 - 33; + let xc = bytes[5 + i] as i32 - 33; + if lc < 0 || lc > 90 || xc < 0 || xc > 90 { + return None; + } + lat_val = lat_val * 91 + lc as u32; + lon_val = lon_val * 91 + xc as u32; + } + + let lat = 90.0 - lat_val as f64 / 380926.0; + let lon = -180.0 + lon_val as f64 / 190463.0; + + if !(-90.0..=90.0).contains(&lat) || !(-180.0..=180.0).contains(&lon) { + return None; + } + + let sym_code = bytes[9] as char; + let lat = (lat * 1e6).round() / 1e6; + let lon = (lon * 1e6).round() / 1e6; + + Some((lat, lon, sym_table, sym_code)) +} + +fn parse_aprs_lat(s: &str) -> Option { + if s.len() < 8 { + return None; + } + let deg: f64 = s[..2].parse().ok()?; + let min: f64 = s[2..7].parse().ok()?; + let ns = s.as_bytes()[7]; + let mut lat = deg + min / 60.0; + match ns { + b'S' | b's' => lat = -lat, + b'N' | b'n' => {} + _ => return None, + } + Some((lat * 1e6).round() / 1e6) +} + +fn parse_aprs_lon(s: &str) -> Option { + if s.len() < 9 { + return None; + } + let deg: f64 = s[..3].parse().ok()?; + let min: f64 = s[3..8].parse().ok()?; + let ew = s.as_bytes()[8]; + let mut lon = deg + min / 60.0; + match ew { + b'W' | b'w' => lon = -lon, + b'E' | b'e' => {} + _ => return None, + } + Some((lon * 1e6).round() / 1e6) +} + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +pub struct AprsDecoder { + demodulators: Vec, +} + +impl AprsDecoder { + pub fn new(sample_rate: u32) -> Self { + Self { + demodulators: vec![ + Demodulator::new(sample_rate, 1.0), + Demodulator::new(sample_rate, 0.5), + ], + } + } + + pub fn process_samples(&mut self, samples: &[f32]) -> Vec { + let mut seen = std::collections::HashSet::new(); + let mut results = Vec::new(); + + for demod in &mut self.demodulators { + for frame in demod.process_buffer(samples) { + // Dedup by address prefix + payload length + let key_len = frame.payload.len().min(14); + let mut key = Vec::with_capacity(key_len + 4); + key.extend_from_slice(&frame.payload[..key_len]); + key.extend_from_slice(&(frame.payload.len() as u32).to_le_bytes()); + if !seen.insert(key) { + continue; + } + + if let Some(ax25) = parse_ax25(&frame.payload) { + let mut pkt = parse_aprs(&ax25); + pkt.crc_ok = frame.crc_ok; + results.push(pkt); + } + } + } + + results + } + + pub fn reset(&mut self) { + for demod in &mut self.demodulators { + demod.reset_state(); + demod.energy_acc = 0.0; + demod.energy_count = 0; + demod.frames.clear(); + } + } +} diff --git a/src/trx-server/src/decode/cw.rs b/src/trx-server/src/decode/cw.rs new file mode 100644 index 0000000..5231097 --- /dev/null +++ b/src/trx-server/src/decode/cw.rs @@ -0,0 +1,405 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! Goertzel-based CW (Morse code) decoder. +//! +//! Ported from the browser-side JavaScript implementation. + +use trx_core::decode::CwEvent; + +// ITU Morse code lookup +fn morse_lookup(code: &str) -> Option { + match code { + ".-" => Some('A'), + "-..." => Some('B'), + "-.-." => Some('C'), + "-.." => Some('D'), + "." => Some('E'), + "..-." => Some('F'), + "--." => Some('G'), + "...." => Some('H'), + ".." => Some('I'), + ".---" => Some('J'), + "-.-" => Some('K'), + ".-.." => Some('L'), + "--" => Some('M'), + "-." => Some('N'), + "---" => Some('O'), + ".--." => Some('P'), + "--.-" => Some('Q'), + ".-." => Some('R'), + "..." => Some('S'), + "-" => Some('T'), + "..-" => Some('U'), + "...-" => Some('V'), + ".--" => Some('W'), + "-..-" => Some('X'), + "-.--" => Some('Y'), + "--.." => Some('Z'), + "-----" => Some('0'), + ".----" => Some('1'), + "..---" => Some('2'), + "...--" => Some('3'), + "....-" => Some('4'), + "....." => Some('5'), + "-...." => Some('6'), + "--..." => Some('7'), + "---.." => Some('8'), + "----." => Some('9'), + ".-.-.-" => Some('.'), + "--..--" => Some(','), + "..--.." => Some('?'), + ".----." => Some('\''), + "-.-.--" => Some('!'), + "-..-." => Some('/'), + "-.--." => Some('('), + "-.--.-" => Some(')'), + ".-..." => Some('&'), + "---..." => Some(':'), + "-.-.-." => Some(';'), + "-...-" => Some('='), + ".-.-." => Some('+'), + "-....-" => Some('-'), + "..--.-" => Some('_'), + ".-..-." => Some('"'), + "...-..-" => Some('$'), + ".--.-." => Some('@'), + _ => None, + } +} + +// --------------------------------------------------------------------------- +// Goertzel detector +// --------------------------------------------------------------------------- + +fn goertzel_energy(buf: &[f32], coeff: f32) -> f32 { + let mut s1: f32 = 0.0; + let mut s2: f32 = 0.0; + for &sample in buf { + let s0 = coeff * s1 - s2 + sample; + s2 = s1; + s1 = s0; + } + let n2 = (buf.len() * buf.len()) as f32; + (s1 * s1 + s2 * s2 - coeff * s1 * s2) / n2 +} + +// --------------------------------------------------------------------------- +// Tone scan bins +// --------------------------------------------------------------------------- + +const TONE_SCAN_LOW: u32 = 300; +const TONE_SCAN_HIGH: u32 = 1200; +const TONE_SCAN_STEP: u32 = 25; +const TONE_STABLE_NEEDED: u32 = 3; +const THRESHOLD: f32 = 0.05; + +struct ToneScanBin { + freq: u32, + coeff: f32, +} + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +pub struct CwDecoder { + sample_rate: u32, + window_size: usize, + sample_buf: Vec, + sample_idx: usize, + + // Goertzel parameters + tone_freq: u32, + coeff: f32, + + // Tone state + tone_on: bool, + tone_on_at: f64, + tone_off_at: f64, + current_symbol: String, + sample_counter: u64, + + // WPM + wpm: u32, + + // Auto tone detection + tone_scan_bins: Vec, + tone_stable_bin: i32, + tone_stable_count: u32, + + // Auto WPM detection + on_durations: Vec, + + // Results + events: Vec, +} + +impl CwDecoder { + pub fn new(sample_rate: u32) -> Self { + let window_ms = 50; + let window_size = (sample_rate as usize * window_ms) / 1000; + let default_tone = 700u32; + let k = (default_tone as f32 * window_size as f32 / sample_rate as f32).round(); + let omega = (2.0 * std::f32::consts::PI * k) / window_size as f32; + let coeff = 2.0 * omega.cos(); + + // Build scan bins + let mut tone_scan_bins = Vec::new(); + let mut f = TONE_SCAN_LOW; + while f <= TONE_SCAN_HIGH { + let bk = + (f as f32 * window_size as f32 / sample_rate as f32).round(); + let b_omega = (2.0 * std::f32::consts::PI * bk) / window_size as f32; + tone_scan_bins.push(ToneScanBin { + freq: f, + coeff: 2.0 * b_omega.cos(), + }); + f += TONE_SCAN_STEP; + } + + Self { + sample_rate, + window_size, + sample_buf: vec![0.0f32; window_size], + sample_idx: 0, + tone_freq: default_tone, + coeff, + tone_on: false, + tone_on_at: 0.0, + tone_off_at: 0.0, + current_symbol: String::new(), + sample_counter: 0, + wpm: 15, + tone_scan_bins, + tone_stable_bin: -1, + tone_stable_count: 0, + on_durations: Vec::new(), + events: Vec::new(), + } + } + + fn recompute_goertzel(&mut self, new_freq: u32) { + self.tone_freq = new_freq; + let k = (new_freq as f32 * self.window_size as f32 / self.sample_rate as f32) + .round(); + let omega = (2.0 * std::f32::consts::PI * k) / self.window_size as f32; + self.coeff = 2.0 * omega.cos(); + } + + fn unit_ms(&self) -> f64 { + 1200.0 / self.wpm as f64 + } + + fn now_ms(&self) -> f64 { + self.sample_counter as f64 * 1000.0 / self.sample_rate as f64 + } + + fn goertzel_detect(&self) -> bool { + let tone_energy = goertzel_energy(&self.sample_buf, self.coeff); + let mut total_energy: f32 = 0.0; + for &s in &self.sample_buf { + total_energy += s * s; + } + let avg_energy = total_energy / self.sample_buf.len() as f32; + if avg_energy < 1e-10 { + return false; + } + (tone_energy / avg_energy) > THRESHOLD + } + + fn auto_detect_tone(&mut self) { + let mut total_energy: f32 = 0.0; + for &s in &self.sample_buf { + total_energy += s * s; + } + let avg_energy = total_energy / self.sample_buf.len() as f32; + if avg_energy < 1e-10 { + return; + } + + let mut best_idx: i32 = -1; + let mut best_ratio: f32 = 0.0; + for (i, bin) in self.tone_scan_bins.iter().enumerate() { + let e = goertzel_energy(&self.sample_buf, bin.coeff); + let ratio = e / avg_energy; + if ratio > best_ratio { + best_ratio = ratio; + best_idx = i as i32; + } + } + + if best_ratio < THRESHOLD || best_idx < 0 { + self.tone_stable_count = 0; + self.tone_stable_bin = -1; + return; + } + + if self.tone_stable_bin >= 0 + && (best_idx - self.tone_stable_bin).unsigned_abs() <= 1 + { + self.tone_stable_count += 1; + } else { + self.tone_stable_bin = best_idx; + self.tone_stable_count = 1; + } + + if self.tone_stable_count >= TONE_STABLE_NEEDED { + let detected_freq = self.tone_scan_bins[self.tone_stable_bin as usize].freq; + if (detected_freq as i32 - self.tone_freq as i32).unsigned_abs() + > TONE_SCAN_STEP + { + self.recompute_goertzel(detected_freq); + } + } + } + + fn auto_detect_wpm(&mut self) { + if self.on_durations.len() < 8 { + return; + } + + let mut sorted: Vec = self.on_durations.clone(); + sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + + let mut best_boundary = 1usize; + let mut best_score = f64::INFINITY; + for i in 1..sorted.len() { + let c1 = &sorted[..i]; + let c2 = &sorted[i..]; + let mean1: f64 = c1.iter().sum::() / c1.len() as f64; + let mean2: f64 = c2.iter().sum::() / c2.len() as f64; + let mut score: f64 = 0.0; + for &v in c1 { + score += (v - mean1) * (v - mean1); + } + for &v in c2 { + score += (v - mean2) * (v - mean2); + } + if score < best_score { + best_score = score; + best_boundary = i; + } + } + + let dit_cluster = &sorted[..best_boundary]; + if dit_cluster.is_empty() { + return; + } + let dit_ms = dit_cluster[dit_cluster.len() / 2]; + if dit_ms < 10.0 { + return; + } + + let new_wpm = (1200.0 / dit_ms).round() as u32; + let new_wpm = new_wpm.clamp(5, 40); + if new_wpm != self.wpm { + self.wpm = new_wpm; + } + } + + fn process_window(&mut self) { + // Auto tone detection + self.auto_detect_tone(); + + let detected = self.goertzel_detect(); + let now = self.now_ms(); + + // Emit signal state event on transitions + if detected && !self.tone_on { + // Tone just turned on + self.tone_on = true; + let off_duration = now - self.tone_off_at; + if self.tone_off_at > 0.0 { + let u = self.unit_ms(); + if off_duration > u * 5.0 { + // Word gap + if !self.current_symbol.is_empty() { + let ch = morse_lookup(&self.current_symbol) + .unwrap_or('?'); + self.emit_text(&ch.to_string()); + self.current_symbol.clear(); + } + self.emit_text(" "); + } else if off_duration > u * 2.0 { + // Character gap + if !self.current_symbol.is_empty() { + let ch = morse_lookup(&self.current_symbol) + .unwrap_or('?'); + self.emit_text(&ch.to_string()); + self.current_symbol.clear(); + } + } + } + self.tone_on_at = now; + } else if !detected && self.tone_on { + // Tone just turned off + self.tone_on = false; + let on_duration = now - self.tone_on_at; + let u = self.unit_ms(); + if on_duration > u * 2.0 { + self.current_symbol.push('-'); + } else { + self.current_symbol.push('.'); + } + self.tone_off_at = now; + + // Collect for auto WPM + self.on_durations.push(on_duration); + if self.on_durations.len() > 30 { + self.on_durations.remove(0); + } + self.auto_detect_wpm(); + } + + // Flush pending character after long silence + if !self.tone_on && !self.current_symbol.is_empty() && self.tone_off_at > 0.0 { + let silence = now - self.tone_off_at; + if silence > self.unit_ms() * 5.0 { + let ch = morse_lookup(&self.current_symbol).unwrap_or('?'); + self.emit_text(&ch.to_string()); + self.current_symbol.clear(); + } + } + } + + fn emit_text(&mut self, text: &str) { + self.events.push(CwEvent { + text: text.to_string(), + wpm: self.wpm, + tone_hz: self.tone_freq, + signal_on: self.tone_on, + }); + } + + pub fn process_samples(&mut self, samples: &[f32]) -> Vec { + for &s in samples { + self.sample_buf[self.sample_idx] = s; + self.sample_idx += 1; + self.sample_counter += 1; + if self.sample_idx >= self.window_size { + self.process_window(); + self.sample_idx = 0; + } + } + std::mem::take(&mut self.events) + } + + pub fn reset(&mut self) { + self.sample_buf.fill(0.0); + self.sample_idx = 0; + self.tone_on = false; + self.tone_on_at = 0.0; + self.tone_off_at = 0.0; + self.current_symbol.clear(); + self.sample_counter = 0; + self.wpm = 15; + self.tone_freq = 700; + self.recompute_goertzel(700); + self.tone_stable_bin = -1; + self.tone_stable_count = 0; + self.on_durations.clear(); + self.events.clear(); + } +} diff --git a/src/trx-server/src/decode/mod.rs b/src/trx-server/src/decode/mod.rs new file mode 100644 index 0000000..447e4eb --- /dev/null +++ b/src/trx-server/src/decode/mod.rs @@ -0,0 +1,6 @@ +// SPDX-FileCopyrightText: 2025 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +pub mod aprs; +pub mod cw; diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index f218a29..90360d9 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -4,6 +4,7 @@ mod audio; mod config; +mod decode; mod error; mod listener; mod plugins; @@ -336,8 +337,33 @@ async fn main() -> DynResult<()> { let (rx_audio_tx, _) = broadcast::channel::(256); let (tx_audio_tx, tx_audio_rx) = mpsc::channel::(64); + // PCM tap for server-side decoders + let (pcm_tx, _) = broadcast::channel::>(64); + // Decoded messages broadcast + let (decode_tx, _) = broadcast::channel::(256); + if cfg.audio.rx_enabled { - let _capture_thread = audio::spawn_audio_capture(&cfg.audio, rx_audio_tx.clone()); + let _capture_thread = audio::spawn_audio_capture(&cfg.audio, rx_audio_tx.clone(), Some(pcm_tx.clone())); + + // Spawn APRS decoder task + let aprs_pcm_rx = pcm_tx.subscribe(); + let aprs_state_rx = _state_rx.clone(); + let aprs_decode_tx = decode_tx.clone(); + let aprs_sr = cfg.audio.sample_rate; + let aprs_ch = cfg.audio.channels; + tokio::spawn(audio::run_aprs_decoder( + aprs_sr, aprs_ch as u16, aprs_pcm_rx, aprs_state_rx, aprs_decode_tx, + )); + + // Spawn CW decoder task + let cw_pcm_rx = pcm_tx.subscribe(); + let cw_state_rx = _state_rx.clone(); + let cw_decode_tx = decode_tx.clone(); + let cw_sr = cfg.audio.sample_rate; + let cw_ch = cfg.audio.channels; + tokio::spawn(audio::run_cw_decoder( + cw_sr, cw_ch as u16, cw_pcm_rx, cw_state_rx, cw_decode_tx, + )); } if cfg.audio.tx_enabled { let _playback_thread = audio::spawn_audio_playback(&cfg.audio, tx_audio_rx); @@ -345,7 +371,7 @@ async fn main() -> DynResult<()> { tokio::spawn(async move { if let Err(e) = - audio::run_audio_listener(audio_listen, rx_audio_tx, tx_audio_tx, stream_info) + audio::run_audio_listener(audio_listen, rx_audio_tx, tx_audio_tx, stream_info, decode_tx) .await { error!("Audio listener error: {:?}", e);