diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 3bb6edc..ec771e8 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -23,25 +23,23 @@ use trx_ais::AisDecoder; use trx_aprs::AprsDecoder; use trx_core::audio::{ parse_vchan_uuid_msg, read_audio_msg, write_audio_msg, write_vchan_audio_frame, - write_vchan_uuid_msg, AudioStreamInfo, - AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT2_DECODE, - AUDIO_MSG_FT4_DECODE, - AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, - AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, - AUDIO_MSG_VCHAN_BW, AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, - AUDIO_MSG_VCHAN_REMOVE, + write_vchan_uuid_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, + AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT2_DECODE, AUDIO_MSG_FT4_DECODE, AUDIO_MSG_FT8_DECODE, + AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, + AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VCHAN_ALLOCATED, AUDIO_MSG_VCHAN_BW, + AUDIO_MSG_VCHAN_DESTROYED, AUDIO_MSG_VCHAN_FREQ, AUDIO_MSG_VCHAN_MODE, AUDIO_MSG_VCHAN_REMOVE, AUDIO_MSG_VCHAN_SUB, AUDIO_MSG_VCHAN_UNSUB, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, }; -use trx_core::vchan::SharedVChanManager; -use uuid::Uuid; use trx_core::decode::{ AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage, }; use trx_core::rig::state::{RigMode, RigState}; +use trx_core::vchan::SharedVChanManager; use trx_cw::CwDecoder; use trx_ft8::Ft8Decoder; use trx_vdes::VdesDecoder; use trx_wspr::WsprDecoder; +use uuid::Uuid; use crate::config::AudioConfig; use trx_decode_log::DecoderLoggers; @@ -1053,6 +1051,8 @@ pub async fn run_aprs_decoder( last_reset_seq = reset_seq; decoder.reset(); info!("APRS decoder reset (seq={})", last_reset_seq); + pcm_rx = pcm_rx.resubscribe(); + continue; } // Downmix to mono if stereo @@ -1070,6 +1070,14 @@ pub async fn run_aprs_decoder( was_active = true; let packets = tokio::task::block_in_place(|| decoder.process_samples(&mono)); + let latest_reset_seq = state_rx.borrow().aprs_decode_reset_seq; + if latest_reset_seq != reset_seq { + last_reset_seq = latest_reset_seq; + decoder.reset(); + info!("APRS decoder reset (seq={})", last_reset_seq); + pcm_rx = pcm_rx.resubscribe(); + continue; + } for mut pkt in packets { if let Some(logger) = decode_logs.as_ref() { logger.log_aprs(&pkt); @@ -1124,7 +1132,10 @@ pub async fn run_hf_aprs_decoder( decode_logs: Option>, histories: Arc, ) { - info!("HF APRS decoder started ({}Hz, {} ch)", sample_rate, channels); + info!( + "HF APRS decoder started ({}Hz, {} ch)", + sample_rate, channels + ); let mut decoder = AprsDecoder::new_hf(sample_rate); let mut was_active = false; let mut last_reset_seq: u64 = 0; @@ -1162,6 +1173,8 @@ pub async fn run_hf_aprs_decoder( last_reset_seq = reset_seq; decoder.reset(); info!("HF APRS decoder reset (seq={})", last_reset_seq); + pcm_rx = pcm_rx.resubscribe(); + continue; } let mut mono = downmix_if_needed(frame, channels); @@ -1169,6 +1182,14 @@ pub async fn run_hf_aprs_decoder( was_active = true; let packets = tokio::task::block_in_place(|| decoder.process_samples(&mono)); + let latest_reset_seq = state_rx.borrow().hf_aprs_decode_reset_seq; + if latest_reset_seq != reset_seq { + last_reset_seq = latest_reset_seq; + decoder.reset(); + info!("HF APRS decoder reset (seq={})", last_reset_seq); + pcm_rx = pcm_rx.resubscribe(); + continue; + } for mut pkt in packets { if let Some(logger) = decode_logs.as_ref() { logger.log_aprs(&pkt); @@ -1241,10 +1262,7 @@ pub async fn run_ais_decoder( let mut decoder_a = AisDecoder::new(sample_rate); let mut decoder_b = AisDecoder::new(sample_rate); let mut was_active = false; - let mut active = matches!( - state_rx.borrow().status.mode, - RigMode::AIS - ); + let mut active = matches!(state_rx.borrow().status.mode, RigMode::AIS); loop { if !active { @@ -1338,10 +1356,7 @@ pub async fn run_vdes_decoder( info!("VDES decoder started ({}Hz complex baseband)", sample_rate); let mut decoder = VdesDecoder::new(sample_rate); let mut was_active = false; - let mut active = matches!( - state_rx.borrow().status.mode, - RigMode::VDES - ); + let mut active = matches!(state_rx.borrow().status.mode, RigMode::VDES); loop { if !active { @@ -1486,6 +1501,8 @@ pub async fn run_cw_decoder( last_reset_seq = reset_seq; decoder.reset(); info!("CW decoder reset (seq={})", last_reset_seq); + pcm_rx = pcm_rx.resubscribe(); + continue; } if !process_enabled { if was_active { @@ -1509,6 +1526,14 @@ pub async fn run_cw_decoder( }; was_active = true; let events = tokio::task::block_in_place(|| decoder.process_samples(&mono)); + let latest_reset_seq = state_rx.borrow().cw_decode_reset_seq; + if latest_reset_seq != reset_seq { + last_reset_seq = latest_reset_seq; + decoder.reset(); + info!("CW decoder reset (seq={})", last_reset_seq); + pcm_rx = pcm_rx.resubscribe(); + continue; + } for evt in events { if let Some(logger) = decode_logs.as_ref() { logger.log_cw(&evt); @@ -1686,6 +1711,8 @@ pub async fn run_ft8_decoder( last_reset_seq = reset_seq; decoder.reset(); ft8_buf.clear(); + pcm_rx = pcm_rx.resubscribe(); + continue; } let mut mono = downmix_mono(frame, channels); @@ -1702,6 +1729,14 @@ pub async fn run_ft8_decoder( decoder.process_block(&block); decoder.decode_if_ready(100) }); + let latest_reset_seq = state_rx.borrow().ft8_decode_reset_seq; + if latest_reset_seq != reset_seq { + last_reset_seq = latest_reset_seq; + decoder.reset(); + ft8_buf.clear(); + pcm_rx = pcm_rx.resubscribe(); + continue; + } if !results.is_empty() { for res in results { let ts_ms = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { @@ -1831,6 +1866,8 @@ pub async fn run_ft4_decoder( last_reset_seq = reset_seq; decoder.reset(); ft4_buf.clear(); + pcm_rx = pcm_rx.resubscribe(); + continue; } let mut mono = downmix_mono(frame, channels); @@ -1847,6 +1884,14 @@ pub async fn run_ft4_decoder( decoder.process_block(&block); decoder.decode_if_ready(100) }); + let latest_reset_seq = state_rx.borrow().ft4_decode_reset_seq; + if latest_reset_seq != reset_seq { + last_reset_seq = latest_reset_seq; + decoder.reset(); + ft4_buf.clear(); + pcm_rx = pcm_rx.resubscribe(); + continue; + } if !results.is_empty() { for res in results { let ts_ms = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { @@ -1965,6 +2010,8 @@ pub async fn run_ft2_decoder( ft2_buf.clear(); pending_decode_samples = 0; recent_decodes.clear(); + pcm_rx = pcm_rx.resubscribe(); + continue; } let mut mono = downmix_mono(frame, channels); @@ -1984,6 +2031,16 @@ pub async fn run_ft2_decoder( let results = tokio::task::block_in_place(|| { decode_ft2_window(&mut decoder, &ft2_buf, 100) }); + let latest_reset_seq = state_rx.borrow().ft2_decode_reset_seq; + if latest_reset_seq != reset_seq { + last_reset_seq = latest_reset_seq; + decoder.reset(); + ft2_buf.clear(); + pending_decode_samples = 0; + recent_decodes.clear(); + pcm_rx = pcm_rx.resubscribe(); + continue; + } if !results.is_empty() { for res in results { let ts_ms = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { @@ -2109,13 +2166,33 @@ pub async fn run_wspr_decoder( Err(_) => 0, }; let slot = now / slot_len_s; + let reset_seq = { + let state = state_rx.borrow(); + state.wspr_decode_reset_seq + }; + if reset_seq != last_reset_seq { + last_reset_seq = reset_seq; + slot_buf.clear(); + last_slot = slot; + pcm_rx = pcm_rx.resubscribe(); + continue; + } if last_slot == -1 { last_slot = slot; } else if slot != last_slot { let base_freq = state_rx.borrow().status.freq.hz; - match tokio::task::block_in_place(|| { + let decode_results = tokio::task::block_in_place(|| { decoder.decode_slot(&slot_buf, Some(base_freq)) - }) { + }); + let latest_reset_seq = state_rx.borrow().wspr_decode_reset_seq; + if latest_reset_seq != reset_seq { + last_reset_seq = latest_reset_seq; + slot_buf.clear(); + last_slot = slot; + pcm_rx = pcm_rx.resubscribe(); + continue; + } + match decode_results { Ok(results) => { for res in results { let ts_ms = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { @@ -2141,15 +2218,13 @@ pub async fn run_wspr_decoder( slot_buf.clear(); last_slot = slot; } - - let reset_seq = { - let state = state_rx.borrow(); - state.wspr_decode_reset_seq - }; - if reset_seq != last_reset_seq { - last_reset_seq = reset_seq; + let latest_reset_seq = state_rx.borrow().wspr_decode_reset_seq; + if latest_reset_seq != last_reset_seq { + last_reset_seq = latest_reset_seq; slot_buf.clear(); last_slot = slot; + pcm_rx = pcm_rx.resubscribe(); + continue; } let mut mono = downmix_mono(frame, channels); @@ -2348,8 +2423,7 @@ async fn run_background_ft8_decoder( loop { match pcm_rx.recv().await { Ok(frame) => { - let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) - { + let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { Ok(dur) => dur.as_secs() as i64, Err(_) => 0, }; @@ -2427,11 +2501,10 @@ async fn run_background_ft4_decoder( match pcm_rx.recv().await { Ok(frame) => { let now_ms = - match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) - { - Ok(dur) => dur.as_millis() as i64, - Err(_) => 0, - }; + match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { + Ok(dur) => dur.as_millis() as i64, + Err(_) => 0, + }; // FT4 slot period is 7.5s let slot = now_ms / 7_500; if slot != last_slot { @@ -2540,11 +2613,7 @@ async fn run_background_ft2_decoder( }, message: res.text, }; - if !should_emit_ft2_decode( - &mut recent_decodes, - &msg.message, - msg.freq_hz, - ) { + if !should_emit_ft2_decode(&mut recent_decodes, &msg.message, msg.freq_hz) { continue; } let _ = decode_tx.send(DecodedMessage::Ft2(msg)); @@ -2584,8 +2653,7 @@ async fn run_background_wspr_decoder( loop { match pcm_rx.recv().await { Ok(frame) => { - let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) - { + let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { Ok(dur) => dur.as_secs() as i64, Err(_) => 0, }; @@ -2755,15 +2823,51 @@ async fn handle_audio_client( }; } - push_history!(histories.snapshot_ais_history(), DecodedMessage::Ais, AUDIO_MSG_AIS_DECODE); - push_history!(histories.snapshot_vdes_history(), DecodedMessage::Vdes, AUDIO_MSG_VDES_DECODE); - push_history!(histories.snapshot_aprs_history(), DecodedMessage::Aprs, AUDIO_MSG_APRS_DECODE); - push_history!(histories.snapshot_hf_aprs_history(), DecodedMessage::HfAprs, AUDIO_MSG_HF_APRS_DECODE); - push_history!(histories.snapshot_ft8_history(), DecodedMessage::Ft8, AUDIO_MSG_FT8_DECODE); - push_history!(histories.snapshot_ft4_history(), DecodedMessage::Ft4, AUDIO_MSG_FT4_DECODE); - push_history!(histories.snapshot_ft2_history(), DecodedMessage::Ft2, AUDIO_MSG_FT2_DECODE); - push_history!(histories.snapshot_wspr_history(), DecodedMessage::Wspr, AUDIO_MSG_WSPR_DECODE); - push_history!(histories.snapshot_cw_history(), DecodedMessage::Cw, AUDIO_MSG_CW_DECODE); + push_history!( + histories.snapshot_ais_history(), + DecodedMessage::Ais, + AUDIO_MSG_AIS_DECODE + ); + push_history!( + histories.snapshot_vdes_history(), + DecodedMessage::Vdes, + AUDIO_MSG_VDES_DECODE + ); + push_history!( + histories.snapshot_aprs_history(), + DecodedMessage::Aprs, + AUDIO_MSG_APRS_DECODE + ); + push_history!( + histories.snapshot_hf_aprs_history(), + DecodedMessage::HfAprs, + AUDIO_MSG_HF_APRS_DECODE + ); + push_history!( + histories.snapshot_ft8_history(), + DecodedMessage::Ft8, + AUDIO_MSG_FT8_DECODE + ); + push_history!( + histories.snapshot_ft4_history(), + DecodedMessage::Ft4, + AUDIO_MSG_FT4_DECODE + ); + push_history!( + histories.snapshot_ft2_history(), + DecodedMessage::Ft2, + AUDIO_MSG_FT2_DECODE + ); + push_history!( + histories.snapshot_wspr_history(), + DecodedMessage::Wspr, + AUDIO_MSG_WSPR_DECODE + ); + push_history!( + histories.snapshot_cw_history(), + DecodedMessage::Cw, + AUDIO_MSG_CW_DECODE + ); (blob, count) }; @@ -2773,10 +2877,7 @@ async fn handle_audio_client( // well (~10-20x) so this dramatically reduces both transfer size and // the time the client spends waiting for data. let compressed = { - let mut enc = GzEncoder::new( - Vec::with_capacity(blob.len() / 8), - Compression::fast(), - ); + let mut enc = GzEncoder::new(Vec::with_capacity(blob.len() / 8), Compression::fast()); enc.write_all(&blob) .and_then(|_| enc.finish()) .unwrap_or(blob.clone()) @@ -2807,7 +2908,7 @@ async fn handle_audio_client( let (bg_decode_tx, mut bg_decode_rx) = broadcast::channel::(128); let opus_sample_rate = stream_info.sample_rate; - let opus_channels = stream_info.channels; + let opus_channels = stream_info.channels; // Subscribe to server-side channel destruction events (SDR rigs only). let mut destroyed_rx: Option> = @@ -3160,9 +3261,7 @@ async fn handle_audio_client( // Payload: JSON { "uuid": "...", "freq_hz": N, "mode": "..." } match serde_json::from_slice::(&payload) { Ok(v) => { - let uuid = v["uuid"] - .as_str() - .and_then(|s| s.parse::().ok()); + let uuid = v["uuid"].as_str().and_then(|s| s.parse::().ok()); let freq_hz = v["freq_hz"].as_u64(); let mode_str = v["mode"].as_str().unwrap_or("USB"); let hidden = v["hidden"].as_bool().unwrap_or(false); @@ -3184,9 +3283,16 @@ async fn handle_audio_client( }; match ensure_result { Ok(pcm_rx) => { - if let Some(bandwidth_hz) = bandwidth_hz.filter(|bw| *bw > 0) { - if let Err(e) = mgr.set_channel_bandwidth(uuid, bandwidth_hz) { - warn!("Audio vchan SUB bandwidth apply failed: {}", e); + if let Some(bandwidth_hz) = + bandwidth_hz.filter(|bw| *bw > 0) + { + if let Err(e) = + mgr.set_channel_bandwidth(uuid, bandwidth_hz) + { + warn!( + "Audio vchan SUB bandwidth apply failed: {}", + e + ); } } let _ = vchan_cmd_tx @@ -3194,10 +3300,11 @@ async fn handle_audio_client( uuid, pcm_rx, send_audio: !hidden, - background_decode: (!decoder_kinds.is_empty()).then_some(BackgroundDecodeSpec { - base_freq_hz: freq_hz, - decoder_kinds, - }), + background_decode: (!decoder_kinds.is_empty()) + .then_some(BackgroundDecodeSpec { + base_freq_hz: freq_hz, + decoder_kinds, + }), }) .await; } @@ -3211,14 +3318,12 @@ async fn handle_audio_client( } } } - Ok((AUDIO_MSG_VCHAN_UNSUB, payload)) => { - match parse_vchan_uuid_msg(&payload) { - Ok(uuid) => { - let _ = vchan_cmd_tx.send(VChanCmd::Unsubscribe(uuid)).await; - } - Err(e) => warn!("Audio vchan UNSUB: bad payload: {}", e), + Ok((AUDIO_MSG_VCHAN_UNSUB, payload)) => match parse_vchan_uuid_msg(&payload) { + Ok(uuid) => { + let _ = vchan_cmd_tx.send(VChanCmd::Unsubscribe(uuid)).await; } - } + Err(e) => warn!("Audio vchan UNSUB: bad payload: {}", e), + }, Ok((AUDIO_MSG_VCHAN_FREQ, payload)) => { if let Some(ref mgr) = vchan_manager { if let Ok(v) = serde_json::from_slice::(&payload) { @@ -3236,7 +3341,8 @@ async fn handle_audio_client( Ok((AUDIO_MSG_VCHAN_MODE, payload)) => { if let Some(ref mgr) = vchan_manager { if let Ok(v) = serde_json::from_slice::(&payload) { - if let Some(uuid) = v["uuid"].as_str().and_then(|s| s.parse::().ok()) { + if let Some(uuid) = v["uuid"].as_str().and_then(|s| s.parse::().ok()) + { let mode = trx_protocol::codec::parse_mode( v["mode"].as_str().unwrap_or("USB"), ); @@ -3280,7 +3386,10 @@ async fn handle_audio_client( } } Ok((msg_type, _)) => { - warn!("Audio: unexpected message type {:#04x} from {}", msg_type, peer); + warn!( + "Audio: unexpected message type {:#04x} from {}", + msg_type, peer + ); } Err(_) => break, } diff --git a/src/trx-server/src/rig_task.rs b/src/trx-server/src/rig_task.rs index 609730f..1d16d10 100644 --- a/src/trx-server/src/rig_task.rs +++ b/src/trx-server/src/rig_task.rs @@ -671,7 +671,9 @@ async fn process_command( // Apply state updates based on command result match cmd_result { CommandResult::FreqUpdated(freq) => { + let prev_freq_hz = ctx.state.status.freq.hz; ctx.state.apply_freq(freq); + invalidate_main_decoder_windows_on_freq_change(ctx.state, prev_freq_hz); *ctx.poll_pause_until = Some(Instant::now() + Duration::from_millis(200)); } CommandResult::ModeUpdated(mode) => { @@ -786,7 +788,9 @@ async fn refresh_state_from_cat(rig: &mut Box, state: &mut RigState) let (freq, mode, vfo) = rig.get_status().await?; state.filter = rig.filter_state(); state.control.enabled = Some(true); + let prev_freq_hz = state.status.freq.hz; state.apply_freq(freq); + invalidate_main_decoder_windows_on_freq_change(state, prev_freq_hz); state.apply_mode(mode); state.status.vfo = vfo; @@ -923,9 +927,7 @@ fn map_signal_strength(mode: &RigMode, raw: u8) -> i32 { // FT-817 returns 0-15 for signal strength // Map to approximate dBm / S-units match mode { - RigMode::FM | RigMode::WFM | RigMode::AIS | RigMode::VDES => { - -120 + (raw as i32 * 6) - } + RigMode::FM | RigMode::WFM | RigMode::AIS | RigMode::VDES => -120 + (raw as i32 * 6), _ => -127 + (raw as i32 * 6), } } @@ -937,6 +939,19 @@ fn snapshot_from(state: &RigState) -> RigResult { .ok_or_else(|| RigError::invalid_state("Rig info unavailable")) } +fn invalidate_main_decoder_windows_on_freq_change(state: &mut RigState, prev_freq_hz: u64) { + if state.status.freq.hz == prev_freq_hz { + return; + } + state.aprs_decode_reset_seq += 1; + state.hf_aprs_decode_reset_seq += 1; + state.cw_decode_reset_seq += 1; + state.ft8_decode_reset_seq += 1; + state.ft4_decode_reset_seq += 1; + state.ft2_decode_reset_seq += 1; + state.wspr_decode_reset_seq += 1; +} + fn sync_machine_state(machine: &mut RigStateMachine, state: &RigState) { let desired = desired_machine_state(state); match (machine.state().clone(), &desired) { @@ -1069,3 +1084,50 @@ fn tx_meter_parts(tx: Option<&RigTxStatus>) -> (Option, Option, Option