From fffc4c6b90d83c418cdef858ea40c70ed62cbe00 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Fri, 27 Feb 2026 23:57:46 +0100 Subject: [PATCH] [feat](trx-rs): add WFM RDS and playback controls Co-authored-by: OpenAI Codex Signed-off-by: Stan Grams --- Cargo.lock | 8 + Cargo.toml | 1 + src/decoders/trx-rds/Cargo.toml | 11 + src/decoders/trx-rds/src/lib.rs | 432 ++++++++++++++++++ .../trx-frontend-http/assets/web/app.js | 33 +- .../trx-frontend-http/assets/web/index.html | 18 +- .../trx-frontend-http/assets/web/style.css | 24 +- .../trx-frontend/trx-frontend-http/src/api.rs | 14 + src/trx-core/src/lib.rs | 2 +- src/trx-core/src/rig/command.rs | 1 + src/trx-core/src/rig/controller/handlers.rs | 1 + src/trx-core/src/rig/mod.rs | 10 + src/trx-core/src/rig/state.rs | 22 + src/trx-protocol/src/codec.rs | 3 + src/trx-protocol/src/mapping.rs | 6 + src/trx-protocol/src/types.rs | 1 + src/trx-server/src/rig_task.rs | 10 + .../trx-backend-soapysdr/Cargo.toml | 1 + .../trx-backend-soapysdr/src/demod.rs | 24 +- .../trx-backend-soapysdr/src/dsp.rs | 28 +- .../trx-backend-soapysdr/src/lib.rs | 30 ++ 21 files changed, 659 insertions(+), 21 deletions(-) create mode 100644 src/decoders/trx-rds/Cargo.toml create mode 100644 src/decoders/trx-rds/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 5ade2b5..83ed796 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2432,6 +2432,7 @@ dependencies = [ "tokio", "tracing", "trx-core", + "trx-rds", ] [[package]] @@ -2555,6 +2556,13 @@ dependencies = [ "trx-core", ] +[[package]] +name = "trx-rds" +version = "0.1.0" +dependencies = [ + "trx-core", +] + [[package]] name = "trx-server" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 680b02a..2898dc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "src/decoders/trx-cw", "src/decoders/trx-decode-log", "src/decoders/trx-ft8", + "src/decoders/trx-rds", "src/decoders/trx-wspr", "src/trx-core", "src/trx-protocol", diff --git a/src/decoders/trx-rds/Cargo.toml b/src/decoders/trx-rds/Cargo.toml new file mode 100644 index 0000000..c00835d --- /dev/null +++ b/src/decoders/trx-rds/Cargo.toml @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: 2026 Stanislaw Grams +# +# SPDX-License-Identifier: BSD-2-Clause + +[package] +name = "trx-rds" +version = "0.1.0" +edition = "2021" + +[dependencies] +trx-core = { path = "../../trx-core" } diff --git a/src/decoders/trx-rds/src/lib.rs b/src/decoders/trx-rds/src/lib.rs new file mode 100644 index 0000000..a2af60e --- /dev/null +++ b/src/decoders/trx-rds/src/lib.rs @@ -0,0 +1,432 @@ +// SPDX-FileCopyrightText: 2026 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +use std::f32::consts::TAU; + +use trx_core::rig::state::RdsData; + +const RDS_SUBCARRIER_HZ: f32 = 57_000.0; +const RDS_SYMBOL_RATE: f32 = 1_187.5; +const RDS_POLY: u16 = 0x1B9; +const SEARCH_REG_MASK: u32 = (1 << 26) - 1; +const PHASE_CANDIDATES: usize = 8; + +const OFFSET_A: u16 = 0x0FC; +const OFFSET_B: u16 = 0x198; +const OFFSET_C: u16 = 0x168; +const OFFSET_CP: u16 = 0x350; +const OFFSET_D: u16 = 0x1B4; + +#[derive(Debug, Clone)] +struct OnePoleLowPass { + alpha: f32, + y: f32, +} + +impl OnePoleLowPass { + fn new(sample_rate: f32, cutoff_hz: f32) -> Self { + let sr = sample_rate.max(1.0); + let cutoff = cutoff_hz.clamp(1.0, sr * 0.49); + let dt = 1.0 / sr; + let rc = 1.0 / (2.0 * std::f32::consts::PI * cutoff); + let alpha = dt / (rc + dt); + Self { alpha, y: 0.0 } + } + + fn process(&mut self, x: f32) -> f32 { + self.y += self.alpha * (x - self.y); + self.y + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum BlockKind { + A, + B, + C, + CPrime, + D, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ExpectBlock { + B, + C, + D, +} + +#[derive(Debug, Clone)] +struct Candidate { + clock_phase: f32, + clock_inc: f32, + sym_i_acc: f32, + sym_q_acc: f32, + sym_count: u16, + prev_sym: Option<(f32, f32)>, + search_reg: u32, + search_bits: u8, + locked: bool, + expect: ExpectBlock, + block_reg: u32, + block_bits: u8, + block_a: u16, + block_b: u16, + score: u32, + state: RdsData, + ps_bytes: [u8; 8], + ps_seen: [bool; 4], +} + +impl Candidate { + fn new(sample_rate: f32, phase_offset: f32) -> Self { + Self { + clock_phase: phase_offset, + clock_inc: RDS_SYMBOL_RATE / sample_rate.max(1.0), + sym_i_acc: 0.0, + sym_q_acc: 0.0, + sym_count: 0, + prev_sym: None, + search_reg: 0, + search_bits: 0, + locked: false, + expect: ExpectBlock::B, + block_reg: 0, + block_bits: 0, + block_a: 0, + block_b: 0, + score: 0, + state: RdsData::default(), + ps_bytes: [b' '; 8], + ps_seen: [false; 4], + } + } + + fn process_sample(&mut self, i: f32, q: f32) -> Option { + self.sym_i_acc += i; + self.sym_q_acc += q; + self.sym_count = self.sym_count.saturating_add(1); + self.clock_phase += self.clock_inc; + if self.clock_phase < 1.0 { + return None; + } + self.clock_phase -= 1.0; + + let count = f32::from(self.sym_count.max(1)); + let symbol = (self.sym_i_acc / count, self.sym_q_acc / count); + self.sym_i_acc = 0.0; + self.sym_q_acc = 0.0; + self.sym_count = 0; + + let update = if let Some((prev_i, prev_q)) = self.prev_sym { + let dot = symbol.0 * prev_i + symbol.1 * prev_q; + self.push_bit((dot < 0.0) as u8) + } else { + None + }; + self.prev_sym = Some(symbol); + update + } + + fn push_bit(&mut self, bit: u8) -> Option { + if self.locked { + self.block_reg = ((self.block_reg << 1) | u32::from(bit)) & SEARCH_REG_MASK; + self.block_bits = self.block_bits.saturating_add(1); + if self.block_bits < 26 { + return None; + } + let word = self.block_reg; + self.block_reg = 0; + self.block_bits = 0; + return self.consume_locked_block(word); + } + + self.search_reg = ((self.search_reg << 1) | u32::from(bit)) & SEARCH_REG_MASK; + self.search_bits = self.search_bits.saturating_add(1).min(26); + if self.search_bits < 26 { + return None; + } + + let (data, kind) = decode_block(self.search_reg)?; + if kind != BlockKind::A { + return None; + } + + self.locked = true; + self.expect = ExpectBlock::B; + self.block_reg = 0; + self.block_bits = 0; + self.block_a = data; + self.state.pi = Some(data); + None + } + + fn consume_locked_block(&mut self, word: u32) -> Option { + let expected = self.expect; + let Some((data, kind)) = decode_block(word) else { + self.drop_lock(word); + return None; + }; + + match (expected, kind) { + (ExpectBlock::B, BlockKind::B) => { + self.block_b = data; + self.expect = ExpectBlock::C; + None + } + (ExpectBlock::C, BlockKind::C | BlockKind::CPrime) => { + self.expect = ExpectBlock::D; + None + } + (ExpectBlock::D, BlockKind::D) => { + self.locked = false; + self.search_bits = 0; + self.search_reg = 0; + self.process_group(self.block_a, self.block_b, data) + } + (_, BlockKind::A) => { + self.locked = true; + self.expect = ExpectBlock::B; + self.block_reg = 0; + self.block_bits = 0; + self.block_a = data; + self.state.pi = Some(data); + None + } + _ => { + self.drop_lock(word); + None + } + } + } + + fn drop_lock(&mut self, word: u32) { + self.locked = false; + self.expect = ExpectBlock::B; + self.block_reg = 0; + self.block_bits = 0; + self.search_reg = word; + self.search_bits = 26; + if let Some((data, kind)) = decode_block(word) { + if kind == BlockKind::A { + self.locked = true; + self.search_reg = 0; + self.search_bits = 0; + self.block_a = data; + self.state.pi = Some(data); + } + } + } + + fn process_group(&mut self, block_a: u16, block_b: u16, block_d: u16) -> Option { + let mut changed = false; + if self.state.pi != Some(block_a) { + self.state.pi = Some(block_a); + changed = true; + } + + let pty = ((block_b >> 5) & 0x1f) as u8; + if self.state.pty != Some(pty) { + self.state.pty = Some(pty); + self.state.pty_name = Some(pty_name(pty).to_string()); + changed = true; + } + + let group_type = ((block_b >> 12) & 0x0f) as u8; + if group_type == 0 { + let segment = usize::from((block_b & 0x0003) as u8); + let [b0, b1] = block_d.to_be_bytes(); + self.ps_bytes[segment * 2] = sanitize_text_byte(b0); + self.ps_bytes[segment * 2 + 1] = sanitize_text_byte(b1); + self.ps_seen[segment] = true; + if self.ps_seen.iter().all(|seen| *seen) { + let ps = String::from_utf8_lossy(&self.ps_bytes).trim_end().to_string(); + if !ps.is_empty() && self.state.program_service.as_deref() != Some(ps.as_str()) { + self.state.program_service = Some(ps); + changed = true; + } + } + } + + self.score = self.score.saturating_add(1); + changed.then(|| self.state.clone()) + } +} + +#[derive(Debug, Clone)] +pub struct RdsDecoder { + carrier_phase: f32, + carrier_inc: f32, + i_lp: OnePoleLowPass, + q_lp: OnePoleLowPass, + candidates: Vec, + best_score: u32, + best_state: Option, +} + +impl RdsDecoder { + pub fn new(sample_rate: u32) -> Self { + let sample_rate_f = sample_rate.max(1) as f32; + let mut candidates = Vec::with_capacity(PHASE_CANDIDATES); + for idx in 0..PHASE_CANDIDATES { + candidates.push(Candidate::new( + sample_rate_f, + idx as f32 / PHASE_CANDIDATES as f32, + )); + } + Self { + carrier_phase: 0.0, + carrier_inc: TAU * RDS_SUBCARRIER_HZ / sample_rate_f, + i_lp: OnePoleLowPass::new(sample_rate_f, 3_000.0), + q_lp: OnePoleLowPass::new(sample_rate_f, 3_000.0), + candidates, + best_score: 0, + best_state: None, + } + } + + pub fn process_samples(&mut self, samples: &[f32]) -> Option<&RdsData> { + for &sample in samples { + let (sin_p, cos_p) = self.carrier_phase.sin_cos(); + self.carrier_phase = (self.carrier_phase + self.carrier_inc).rem_euclid(TAU); + let mixed_i = self.i_lp.process(sample * cos_p * 2.0); + let mixed_q = self.q_lp.process(sample * -sin_p * 2.0); + + for candidate in &mut self.candidates { + if let Some(update) = candidate.process_sample(mixed_i, mixed_q) { + if candidate.score >= self.best_score { + self.best_score = candidate.score; + self.best_state = Some(update); + } + } + } + } + self.best_state.as_ref() + } + + pub fn snapshot(&self) -> Option { + self.best_state.clone() + } +} + +fn sanitize_text_byte(byte: u8) -> u8 { + if (0x20..=0x7e).contains(&byte) { + byte + } else { + b' ' + } +} + +fn decode_block(word: u32) -> Option<(u16, BlockKind)> { + let data = (word >> 10) as u16; + let check = (word & 0x03ff) as u16; + let syndrome = crc10(data) ^ check; + let kind = match syndrome { + OFFSET_A => BlockKind::A, + OFFSET_B => BlockKind::B, + OFFSET_C => BlockKind::C, + OFFSET_CP => BlockKind::CPrime, + OFFSET_D => BlockKind::D, + _ => return None, + }; + Some((data, kind)) +} + +fn crc10(data: u16) -> u16 { + let mut reg = u32::from(data) << 10; + let poly = u32::from(RDS_POLY); + for shift in (10..=25).rev() { + if (reg & (1 << shift)) != 0 { + reg ^= poly << (shift - 10); + } + } + (reg & 0x03ff) as u16 +} + +fn pty_name(pty: u8) -> &'static str { + match pty { + 0 => "None", + 1 => "News", + 2 => "Current Affairs", + 3 => "Information", + 4 => "Sport", + 5 => "Education", + 6 => "Drama", + 7 => "Culture", + 8 => "Science", + 9 => "Varied", + 10 => "Pop Music", + 11 => "Rock Music", + 12 => "Easy Listening", + 13 => "Light Classical", + 14 => "Serious Classical", + 15 => "Other Music", + 16 => "Weather", + 17 => "Finance", + 18 => "Children's", + 19 => "Social Affairs", + 20 => "Religion", + 21 => "Phone In", + 22 => "Travel", + 23 => "Leisure", + 24 => "Jazz Music", + 25 => "Country Music", + 26 => "National Music", + 27 => "Oldies Music", + 28 => "Folk Music", + 29 => "Documentary", + 30 => "Alarm Test", + _ => "Alarm", + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn encode_block(data: u16, offset: u16) -> u32 { + (u32::from(data) << 10) | u32::from(crc10(data) ^ offset) + } + + #[test] + fn decode_block_recognizes_valid_offsets() { + let block = encode_block(0x1234, OFFSET_A); + let (data, kind) = decode_block(block).expect("valid block"); + assert_eq!(data, 0x1234); + assert_eq!(kind, BlockKind::A); + } + + #[test] + fn decoder_emits_ps_and_pty_from_group_0a() { + let mut candidate = Candidate::new(240_000.0, 0.0); + let pi = 0x52ab; + let block_a = encode_block(pi, OFFSET_A); + let block_b = encode_block((10 << 5) | 0, OFFSET_B); + let block_d = encode_block(u16::from_be_bytes(*b"AB"), OFFSET_D); + + for bit_idx in (0..26).rev() { + let bit = ((block_a >> bit_idx) & 1) as u8; + let _ = candidate.push_bit(bit); + } + for bit_idx in (0..26).rev() { + let bit = ((block_b >> bit_idx) & 1) as u8; + let _ = candidate.push_bit(bit); + } + let filler = encode_block(0, OFFSET_C); + for bit_idx in (0..26).rev() { + let bit = ((filler >> bit_idx) & 1) as u8; + let _ = candidate.push_bit(bit); + } + let mut last = None; + for bit_idx in (0..26).rev() { + let bit = ((block_d >> bit_idx) & 1) as u8; + last = candidate.push_bit(bit); + } + + assert!(last.is_some()); + let state = last.unwrap(); + assert_eq!(state.pi, Some(pi)); + assert_eq!(state.pty, Some(10)); + assert_eq!(state.pty_name.as_deref(), Some("Pop Music")); + } +} diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js index 70a9da4..68950fc 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/app.js @@ -889,6 +889,9 @@ function render(update) { if (update.filter && typeof update.filter.bandwidth_hz === "number") { currentBandwidthHz = update.filter.bandwidth_hz; syncBandwidthInput(currentBandwidthHz); + if (wfmDeemphasisEl && typeof update.filter.wfm_deemphasis_us === "number") { + wfmDeemphasisEl.value = String(update.filter.wfm_deemphasis_us); + } } if (update.status && update.status.freq && typeof update.status.freq.hz === "number") { lastFreqHz = update.status.freq.hz; @@ -904,7 +907,7 @@ function render(update) { if (update.status && update.status.mode) { const mode = normalizeMode(update.status.mode); modeEl.value = mode ? mode.toUpperCase() : ""; - updateWfmAudioModeControl(); + updateWfmControls(); // When filter panel is active (SDR backend), update the BW slider range // to match the new mode — but only if the server hasn't already sent a // filter state that overrides it. @@ -1475,6 +1478,7 @@ async function applyModeFromPicker() { showHint("Mode missing", 1500); return; } + updateWfmControls(); modeEl.disabled = true; showHint("Setting mode…"); try { @@ -2052,7 +2056,8 @@ const txAudioBtn = document.getElementById("tx-audio-btn"); const audioStatus = document.getElementById("audio-status"); const audioLevelFill = document.getElementById("audio-level-fill"); const audioRow = document.getElementById("audio-row"); -const wfmAudioModeWrap = document.getElementById("wfm-audio-mode-wrap"); +const wfmControlsCol = document.getElementById("wfm-controls-col"); +const wfmDeemphasisEl = document.getElementById("wfm-deemphasis"); const wfmAudioModeEl = document.getElementById("wfm-audio-mode"); // Hide audio row if audio is not configured on the server @@ -2080,6 +2085,8 @@ let txTimeoutTimer = null; let txTimeoutRemaining = 0; let txTimeoutInterval = null; const hasWebCodecs = typeof AudioDecoder !== "undefined" && typeof AudioEncoder !== "undefined"; +const MAX_RX_BUFFER_SECS = 0.25; +const TARGET_RX_BUFFER_SECS = 0.04; if (wfmAudioModeEl) { wfmAudioModeEl.value = loadSetting("wfmAudioMode", "stereo"); @@ -2087,12 +2094,16 @@ if (wfmAudioModeEl) { saveSetting("wfmAudioMode", wfmAudioModeEl.value); }); } +if (wfmDeemphasisEl) { + wfmDeemphasisEl.addEventListener("change", () => { + postPath(`/set_wfm_deemphasis?us=${encodeURIComponent(wfmDeemphasisEl.value)}`).catch(() => {}); + }); +} -function updateWfmAudioModeControl() { - if (!wfmAudioModeWrap) return; +function updateWfmControls() { + if (!wfmControlsCol) return; const mode = (modeEl && modeEl.value ? modeEl.value : "").toUpperCase(); - const channels = (streamInfo && streamInfo.channels) || 1; - wfmAudioModeWrap.style.display = mode === "WFM" && channels >= 2 ? "" : "none"; + wfmControlsCol.style.display = mode === "WFM" ? "" : "none"; } // Show compatibility warning for non-Chromium browsers @@ -2148,8 +2159,9 @@ function startRxAudio() { // Stream info JSON try { streamInfo = JSON.parse(evt.data); - updateWfmAudioModeControl(); + updateWfmControls(); audioCtx = new AudioContext({ sampleRate: streamInfo.sample_rate || 48000 }); + audioCtx.resume().catch(() => {}); rxGainNode = audioCtx.createGain(); rxGainNode.gain.value = rxVolSlider.value / 100; rxGainNode.connect(audioCtx.destination); @@ -2214,6 +2226,9 @@ function startRxAudio() { src.buffer = ab; src.connect(rxGainNode); const now = audioCtx.currentTime; + if (nextPlayTime && nextPlayTime - now > MAX_RX_BUFFER_SECS) { + nextPlayTime = now + TARGET_RX_BUFFER_SECS; + } const schedTime = Math.max(now, (nextPlayTime || now)); src.start(schedTime); nextPlayTime = schedTime + ab.duration; @@ -2249,7 +2264,7 @@ function startRxAudio() { if (txActive) { stopTxAudio(); } rxActive = false; streamInfo = null; - updateWfmAudioModeControl(); + updateWfmControls(); rxAudioBtn.style.borderColor = ""; rxAudioBtn.style.color = ""; audioStatus.textContent = "Off"; @@ -2272,7 +2287,7 @@ function stopRxAudio() { streamInfo = null; if (audioWs) { audioWs.close(); audioWs = null; } if (audioCtx) { audioCtx.close(); audioCtx = null; } - updateWfmAudioModeControl(); + updateWfmControls(); rxGainNode = null; if (opusDecoder) { try { opusDecoder.close(); } catch(e) {} diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html index 29721a1..6a39fad 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/index.html @@ -117,6 +117,23 @@ +
Transmit / Power
@@ -168,7 +185,6 @@ -
diff --git a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css index 3ed7b8f..846592e 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css +++ b/src/trx-client/trx-frontend/trx-frontend-http/assets/web/style.css @@ -79,7 +79,7 @@ input.status-input, select.status-input { width: 100%; padding: 0.45rem 0.5rem; #freq { font-family: 'DSEG14 Classic', monospace; font-size: 2rem; padding: 0.5rem 0.6rem; letter-spacing: 0.05em; text-align: center; } .controls-row { display: grid; - grid-template-columns: 1fr auto 1fr; + grid-template-columns: minmax(0, 1fr) auto auto minmax(0, 1fr); gap: 1rem; align-items: start; } @@ -114,6 +114,26 @@ input.status-input, select.status-input { width: 100%; padding: 0.45rem 0.5rem; width: auto; align-items: center; } +.controls-col-wfm.label-below-col .label { + justify-content: flex-start; +} +.wfm-controls-inline { + gap: 0.6rem; + justify-content: flex-start; +} +.wfm-control { + display: flex; + align-items: center; + gap: 0.35rem; + color: var(--text-muted); + font-size: 0.85rem; + white-space: nowrap; +} +.wfm-control .status-input { + min-width: 4.6rem; + width: auto; + font-size: 0.9rem; +} .controls-col-center::after { content: ""; display: block; @@ -583,9 +603,11 @@ button:focus-visible, input:focus-visible, select:focus-visible { .header-rig-switch { width: 100%; justify-content: flex-end; } .header-rig-switch select { min-width: 6.5rem; } .controls-row { grid-template-columns: 1fr auto; } + .controls-col-wfm { grid-column: 1 / -1; } .controls-col-power { grid-column: 1 / -1; } .controls-col.label-below-col .inline, .controls-col.label-below-col .btn-grid { margin-top: 0; } + .wfm-controls-inline { flex-wrap: wrap; } .ft8-controls { flex-wrap: wrap; } #ft8-decode-toggle-btn, #wspr-decode-toggle-btn { white-space: nowrap; } .jog-container { flex-wrap: wrap; } diff --git a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs index 362696d..14166b1 100644 --- a/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs +++ b/src/trx-client/trx-frontend/trx-frontend-http/src/api.rs @@ -468,6 +468,19 @@ pub async fn set_fir_taps( send_command(&rig_tx, RigCommand::SetFirTaps(query.taps)).await } +#[derive(serde::Deserialize)] +pub struct WfmDeemphasisQuery { + pub us: u32, +} + +#[post("/set_wfm_deemphasis")] +pub async fn set_wfm_deemphasis( + query: web::Query, + rig_tx: web::Data>, +) -> Result { + send_command(&rig_tx, RigCommand::SetWfmDeemphasis(query.us)).await +} + #[post("/toggle_aprs_decode")] pub async fn toggle_aprs_decode( state: web::Data>, @@ -679,6 +692,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(set_tx_limit) .service(set_bandwidth) .service(set_fir_taps) + .service(set_wfm_deemphasis) .service(toggle_aprs_decode) .service(toggle_cw_decode) .service(set_cw_auto) diff --git a/src/trx-core/src/lib.rs b/src/trx-core/src/lib.rs index 4bf4663..32acd2b 100644 --- a/src/trx-core/src/lib.rs +++ b/src/trx-core/src/lib.rs @@ -13,5 +13,5 @@ pub type DynResult = Result>; pub use rig::command::RigCommand; pub use rig::request::RigRequest; pub use rig::response::{RigError, RigResult}; -pub use rig::state::{RigFilterState, RigMode, RigSnapshot, RigState}; +pub use rig::state::{RdsData, RigFilterState, RigMode, RigSnapshot, RigState}; pub use rig::AudioSource; diff --git a/src/trx-core/src/rig/command.rs b/src/trx-core/src/rig/command.rs index 360e8e4..f9dde32 100644 --- a/src/trx-core/src/rig/command.rs +++ b/src/trx-core/src/rig/command.rs @@ -33,5 +33,6 @@ pub enum RigCommand { ResetWsprDecoder, SetBandwidth(u32), SetFirTaps(u32), + SetWfmDeemphasis(u32), GetSpectrum, } diff --git a/src/trx-core/src/rig/controller/handlers.rs b/src/trx-core/src/rig/controller/handlers.rs index 7e4bcab..55b1eaa 100644 --- a/src/trx-core/src/rig/controller/handlers.rs +++ b/src/trx-core/src/rig/controller/handlers.rs @@ -516,6 +516,7 @@ pub fn command_from_rig_command(cmd: RigCommand) -> Box { | RigCommand::ResetWsprDecoder | RigCommand::SetBandwidth(_) | RigCommand::SetFirTaps(_) + | RigCommand::SetWfmDeemphasis(_) | RigCommand::GetSpectrum => Box::new(GetSnapshotCommand), } } diff --git a/src/trx-core/src/rig/mod.rs b/src/trx-core/src/rig/mod.rs index f15f281..1d89dc2 100644 --- a/src/trx-core/src/rig/mod.rs +++ b/src/trx-core/src/rig/mod.rs @@ -155,6 +155,16 @@ pub trait RigCat: Rig + Send { ))) } + fn set_wfm_deemphasis<'a>( + &'a mut self, + _deemphasis_us: u32, + ) -> Pin> + Send + 'a>> { + Box::pin(std::future::ready(Err( + Box::new(response::RigError::not_supported("set_wfm_deemphasis")) + as Box, + ))) + } + /// Return the current filter state if this backend supports filter controls. fn filter_state(&self) -> Option { None diff --git a/src/trx-core/src/rig/state.rs b/src/trx-core/src/rig/state.rs index df0c1c2..290a3e0 100644 --- a/src/trx-core/src/rig/state.rs +++ b/src/trx-core/src/rig/state.rs @@ -269,6 +269,12 @@ pub struct RigFilterState { pub bandwidth_hz: u32, pub fir_taps: u32, pub cw_center_hz: u32, + #[serde(default = "default_wfm_deemphasis_us")] + pub wfm_deemphasis_us: u32, +} + +fn default_wfm_deemphasis_us() -> u32 { + 75 } /// Spectrum data from SDR backends (FFT magnitude over the full capture bandwidth). @@ -280,6 +286,22 @@ pub struct SpectrumData { pub center_hz: u64, /// SDR capture sample rate in Hz; the displayed span is ±sample_rate/2. pub sample_rate: u32, + /// Decoded Radio Data System state, when available for WFM. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub rds: Option, +} + +/// Live RDS metadata decoded from a WFM broadcast. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct RdsData { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pi: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub program_service: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pty: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pty_name: Option, } /// Read-only projection of state shared with clients. diff --git a/src/trx-protocol/src/codec.rs b/src/trx-protocol/src/codec.rs index 5a16f91..cb17699 100644 --- a/src/trx-protocol/src/codec.rs +++ b/src/trx-protocol/src/codec.rs @@ -297,6 +297,7 @@ mod tests { bandwidth_hz: 3000, fir_taps: 64, cw_center_hz: 700, + wfm_deemphasis_us: 75, }), ..minimal_snapshot() }) @@ -332,6 +333,7 @@ mod tests { bandwidth_hz: 12000, fir_taps: 128, cw_center_hz: 700, + wfm_deemphasis_us: 50, }), ..minimal_snapshot() }; @@ -340,6 +342,7 @@ mod tests { let f = decoded.filter.expect("filter should round-trip"); assert_eq!(f.bandwidth_hz, 12000); assert_eq!(f.fir_taps, 128); + assert_eq!(f.wfm_deemphasis_us, 50); } fn minimal_snapshot() -> trx_core::rig::state::RigSnapshot { diff --git a/src/trx-protocol/src/mapping.rs b/src/trx-protocol/src/mapping.rs index f4c219d..a5df583 100644 --- a/src/trx-protocol/src/mapping.rs +++ b/src/trx-protocol/src/mapping.rs @@ -48,6 +48,9 @@ pub fn client_command_to_rig(cmd: ClientCommand) -> RigCommand { ClientCommand::ResetWsprDecoder => RigCommand::ResetWsprDecoder, ClientCommand::SetBandwidth { bandwidth_hz } => RigCommand::SetBandwidth(bandwidth_hz), ClientCommand::SetFirTaps { taps } => RigCommand::SetFirTaps(taps), + ClientCommand::SetWfmDeemphasis { deemphasis_us } => { + RigCommand::SetWfmDeemphasis(deemphasis_us) + } ClientCommand::GetSpectrum => RigCommand::GetSpectrum, } } @@ -89,6 +92,9 @@ pub fn rig_command_to_client(cmd: RigCommand) -> ClientCommand { RigCommand::ResetWsprDecoder => ClientCommand::ResetWsprDecoder, RigCommand::SetBandwidth(bandwidth_hz) => ClientCommand::SetBandwidth { bandwidth_hz }, RigCommand::SetFirTaps(taps) => ClientCommand::SetFirTaps { taps }, + RigCommand::SetWfmDeemphasis(deemphasis_us) => { + ClientCommand::SetWfmDeemphasis { deemphasis_us } + } RigCommand::GetSpectrum => ClientCommand::GetSpectrum, } } diff --git a/src/trx-protocol/src/types.rs b/src/trx-protocol/src/types.rs index eec9d52..7a4678c 100644 --- a/src/trx-protocol/src/types.rs +++ b/src/trx-protocol/src/types.rs @@ -38,6 +38,7 @@ pub enum ClientCommand { ResetWsprDecoder, SetBandwidth { bandwidth_hz: u32 }, SetFirTaps { taps: u32 }, + SetWfmDeemphasis { deemphasis_us: u32 }, GetSpectrum, } diff --git a/src/trx-server/src/rig_task.rs b/src/trx-server/src/rig_task.rs index 540268f..40f218d 100644 --- a/src/trx-server/src/rig_task.rs +++ b/src/trx-server/src/rig_task.rs @@ -449,6 +449,16 @@ async fn process_command( let _ = ctx.state_tx.send(ctx.state.clone()); return snapshot_from(ctx.state); } + RigCommand::SetWfmDeemphasis(deemphasis_us) => { + if let Err(e) = ctx.rig.set_wfm_deemphasis(deemphasis_us).await { + return Err(RigError::communication(format!("set_wfm_deemphasis: {e}"))); + } + if let Some(f) = ctx.state.filter.as_mut() { + f.wfm_deemphasis_us = deemphasis_us; + } + let _ = ctx.state_tx.send(ctx.state.clone()); + return snapshot_from(ctx.state); + } RigCommand::SetCenterFreq(freq) => { if let Err(e) = ctx.rig.set_center_freq(freq).await { return Err(RigError::communication(format!("set_center_freq: {e}"))); diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml b/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml index ad5d4c1..7f04edd 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/Cargo.toml @@ -10,6 +10,7 @@ license = "BSD-2-Clause" [dependencies] trx-core = { path = "../../../trx-core" } +trx-rds = { path = "../../../decoders/trx-rds" } tokio = { workspace = true, features = ["sync", "rt"] } serde = { workspace = true } tracing = { workspace = true } diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs index 61e480c..7aa22a5 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/demod.rs @@ -3,7 +3,8 @@ // SPDX-License-Identifier: BSD-2-Clause use num_complex::Complex; -use trx_core::rig::state::RigMode; +use trx_core::rig::state::{RdsData, RigMode}; +use trx_rds::RdsDecoder; #[derive(Debug, Clone)] struct OnePoleLowPass { @@ -50,6 +51,7 @@ impl Deemphasis { #[derive(Debug, Clone)] pub struct WfmStereoDecoder { output_channels: usize, + rds_decoder: RdsDecoder, pilot_phase: f32, pilot_freq: f32, pilot_freq_err: f32, @@ -65,11 +67,18 @@ pub struct WfmStereoDecoder { } impl WfmStereoDecoder { - pub fn new(composite_rate: u32, audio_rate: u32, output_channels: usize) -> Self { + pub fn new( + composite_rate: u32, + audio_rate: u32, + output_channels: usize, + deemphasis_us: u32, + ) -> Self { let composite_rate_f = composite_rate.max(1) as f32; let output_decim = (composite_rate / audio_rate.max(1)).max(1) as usize; + let deemphasis_us = deemphasis_us as f32; Self { output_channels: output_channels.max(1), + rds_decoder: RdsDecoder::new(composite_rate), pilot_phase: 0.0, pilot_freq: 2.0 * std::f32::consts::PI * 19_000.0 / composite_rate_f, pilot_freq_err: 0.0, @@ -77,9 +86,9 @@ impl WfmStereoDecoder { pilot_q_lp: OnePoleLowPass::new(composite_rate_f, 400.0), sum_lp: OnePoleLowPass::new(composite_rate_f, 15_000.0), diff_lp: OnePoleLowPass::new(composite_rate_f, 15_000.0), - deemph_m: Deemphasis::new(audio_rate.max(1) as f32, 75.0), - deemph_l: Deemphasis::new(audio_rate.max(1) as f32, 75.0), - deemph_r: Deemphasis::new(audio_rate.max(1) as f32, 75.0), + deemph_m: Deemphasis::new(audio_rate.max(1) as f32, deemphasis_us), + deemph_l: Deemphasis::new(audio_rate.max(1) as f32, deemphasis_us), + deemph_r: Deemphasis::new(audio_rate.max(1) as f32, deemphasis_us), output_decim, output_counter: 0, } @@ -90,6 +99,7 @@ impl WfmStereoDecoder { if composite.is_empty() { return Vec::new(); } + let _ = self.rds_decoder.process_samples(&composite); let mut output = Vec::with_capacity( (composite.len() / self.output_decim.max(1)) * self.output_channels.max(1), @@ -129,6 +139,10 @@ impl WfmStereoDecoder { output } + + pub fn rds_data(&self) -> Option { + self.rds_decoder.snapshot() + } } /// Selects the demodulation algorithm for a channel. diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs index 3f6fc4c..febaed3 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/dsp.rs @@ -18,7 +18,7 @@ use num_complex::Complex; use rustfft::num_complex::Complex as FftComplex; use rustfft::{Fft, FftPlanner}; use tokio::sync::broadcast; -use trx_core::rig::state::RigMode; +use trx_core::rig::state::{RdsData, RigMode}; use crate::demod::{Demodulator, WfmStereoDecoder}; @@ -266,6 +266,8 @@ pub struct ChannelDsp { audio_bandwidth_hz: u32, /// FIR tap count used when rebuilding filters. fir_taps: usize, + /// WFM deemphasis time constant in microseconds. + wfm_deemphasis_us: u32, /// Decimation factor: `sdr_sample_rate / audio_sample_rate`. pub decim_factor: usize, /// Number of PCM channels emitted in each frame. @@ -338,6 +340,7 @@ impl ChannelDsp { channel_sample_rate, self.audio_sample_rate, self.output_channels, + self.wfm_deemphasis_us, )) } else { None @@ -354,6 +357,7 @@ impl ChannelDsp { output_channels: usize, frame_duration_ms: u16, audio_bandwidth_hz: u32, + wfm_deemphasis_us: u32, fir_taps: usize, pcm_tx: broadcast::Sender>, ) -> Self { @@ -390,6 +394,7 @@ impl ChannelDsp { audio_sample_rate, audio_bandwidth_hz, fir_taps: taps, + wfm_deemphasis_us, decim_factor, output_channels, frame_buf: Vec::with_capacity(frame_size + output_channels), @@ -403,6 +408,7 @@ impl ChannelDsp { channel_sample_rate, audio_sample_rate, output_channels, + wfm_deemphasis_us, )) } else { None @@ -425,6 +431,15 @@ impl ChannelDsp { self.rebuild_filters(); } + pub fn set_wfm_deemphasis(&mut self, deemphasis_us: u32) { + self.wfm_deemphasis_us = deemphasis_us; + self.rebuild_filters(); + } + + pub fn rds_data(&self) -> Option { + self.wfm_decoder.as_ref().and_then(WfmStereoDecoder::rds_data) + } + /// Process a block of raw IQ samples through the full DSP chain. /// /// 1. **Batch mixer**: compute the full LO signal for the block at once, @@ -521,6 +536,7 @@ impl SdrPipeline { audio_sample_rate: u32, output_channels: usize, frame_duration_ms: u16, + wfm_deemphasis_us: u32, channels: &[(f64, RigMode, u32, usize)], ) -> Self { const IQ_BROADCAST_CAPACITY: usize = 64; @@ -541,6 +557,7 @@ impl SdrPipeline { output_channels, frame_duration_ms, audio_bandwidth_hz, + wfm_deemphasis_us, fir_taps, pcm_tx.clone(), ); @@ -760,7 +777,8 @@ mod tests { #[test] fn channel_dsp_processes_silence() { let (pcm_tx, _pcm_rx) = broadcast::channel::>(8); - let mut dsp = ChannelDsp::new(0.0, &RigMode::USB, 48_000, 8_000, 1, 20, 3000, 31, pcm_tx); + let mut dsp = + ChannelDsp::new(0.0, &RigMode::USB, 48_000, 8_000, 1, 20, 3000, 75, 31, pcm_tx); let block = vec![Complex::new(0.0_f32, 0.0_f32); 4096]; dsp.process_block(&block); } @@ -768,7 +786,8 @@ mod tests { #[test] fn channel_dsp_set_mode() { let (pcm_tx, _) = broadcast::channel::>(8); - let mut dsp = ChannelDsp::new(0.0, &RigMode::USB, 48_000, 8_000, 1, 20, 3000, 31, pcm_tx); + let mut dsp = + ChannelDsp::new(0.0, &RigMode::USB, 48_000, 8_000, 1, 20, 3000, 75, 31, pcm_tx); assert_eq!(dsp.demodulator, Demodulator::Usb); dsp.set_mode(&RigMode::FM); assert_eq!(dsp.demodulator, Demodulator::Fm); @@ -782,6 +801,7 @@ mod tests { 48_000, 1, 20, + 75, &[(200_000.0, RigMode::USB, 3000, 64)], ); assert_eq!(pipeline.pcm_senders.len(), 1); @@ -790,7 +810,7 @@ mod tests { #[test] fn pipeline_empty_channels() { - let pipeline = SdrPipeline::start(Box::new(MockIqSource), 1_920_000, 48_000, 1, 20, &[]); + let pipeline = SdrPipeline::start(Box::new(MockIqSource), 1_920_000, 48_000, 1, 20, 75, &[]); assert_eq!(pipeline.pcm_senders.len(), 0); assert_eq!(pipeline.channel_dsps.len(), 0); } diff --git a/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs b/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs index ec6c0df..561dcb7 100644 --- a/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs +++ b/src/trx-server/trx-backend/trx-backend-soapysdr/src/lib.rs @@ -37,6 +37,8 @@ pub struct SoapySdrRig { center_hz: i64, /// Used to send hardware retune commands to the IQ read loop. retune_cmd: Arc>>, + /// Current WFM deemphasis setting in microseconds. + wfm_deemphasis_us: u32, } impl SoapySdrRig { @@ -111,6 +113,7 @@ impl SoapySdrRig { audio_sample_rate, audio_channels, frame_duration_ms, + 75, channels, ); @@ -177,6 +180,7 @@ impl SoapySdrRig { center_offset_hz, center_hz: hardware_center_hz, retune_cmd, + wfm_deemphasis_us: 75, }) } @@ -295,6 +299,25 @@ impl RigCat for SoapySdrRig { }) } + fn set_wfm_deemphasis<'a>( + &'a mut self, + deemphasis_us: u32, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let deemphasis_us = match deemphasis_us { + 50 | 75 => deemphasis_us, + other => { + return Err(format!("unsupported WFM deemphasis {}", other).into()); + } + }; + self.wfm_deemphasis_us = deemphasis_us; + if let Some(dsp_arc) = self.pipeline.channel_dsps.get(self.primary_channel_idx) { + dsp_arc.lock().unwrap().set_wfm_deemphasis(deemphasis_us); + } + Ok(()) + }) + } + fn get_signal_strength<'a>( &'a mut self, ) -> Pin> + Send + 'a>> { @@ -426,15 +449,22 @@ impl RigCat for SoapySdrRig { bandwidth_hz: self.bandwidth_hz, fir_taps: self.fir_taps, cw_center_hz: 700, + wfm_deemphasis_us: self.wfm_deemphasis_us, }) } fn get_spectrum(&self) -> Option { let bins = self.spectrum_buf.lock().ok()?.clone()?; + let rds = self + .pipeline + .channel_dsps + .get(self.primary_channel_idx) + .and_then(|dsp| dsp.lock().ok().and_then(|d| d.rds_data())); Some(SpectrumData { bins, center_hz: self.center_hz.max(0) as u64, sample_rate: self.pipeline.sdr_sample_rate, + rds, }) }