diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index acaeb9e..166e272 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -315,58 +315,91 @@ pub async fn run_aprs_decoder( sample_rate: u32, channels: u16, mut pcm_rx: broadcast::Receiver>, - state_rx: watch::Receiver, + mut state_rx: watch::Receiver, decode_tx: broadcast::Sender, ) { info!("APRS decoder started ({}Hz, {} ch)", sample_rate, channels); let mut decoder = decode::aprs::AprsDecoder::new(sample_rate); let mut was_active = false; let mut last_reset_seq: u64 = 0; + let mut active = matches!(state_rx.borrow().status.mode, RigMode::PKT); loop { - match pcm_rx.recv().await { - Ok(frame) => { - let state = state_rx.borrow().clone(); - let mode = &state.status.mode; - let active = matches!(mode, RigMode::PKT); - - // Check for reset request - if state.aprs_decode_reset_seq != last_reset_seq { - last_reset_seq = state.aprs_decode_reset_seq; - decoder.reset(); - info!("APRS decoder reset (seq={})", last_reset_seq); - } - - if !active { - if was_active { + if !active { + match state_rx.changed().await { + Ok(()) => { + let state = state_rx.borrow(); + active = matches!(state.status.mode, RigMode::PKT); + if active { + pcm_rx = pcm_rx.resubscribe(); + } + if state.aprs_decode_reset_seq != last_reset_seq { + last_reset_seq = state.aprs_decode_reset_seq; decoder.reset(); - was_active = false; + info!("APRS decoder reset (seq={})", last_reset_seq); } - continue; } - was_active = true; + Err(_) => break, + } + continue; + } - // Downmix to mono if stereo - let mono = if channels > 1 { - let num_frames = frame.len() / channels as usize; - let mut mono = Vec::with_capacity(num_frames); - for i in 0..num_frames { - mono.push(frame[i * channels as usize]); + tokio::select! { + recv = pcm_rx.recv() => { + match recv { + Ok(frame) => { + let state = state_rx.borrow(); + if state.aprs_decode_reset_seq != last_reset_seq { + last_reset_seq = state.aprs_decode_reset_seq; + decoder.reset(); + info!("APRS decoder reset (seq={})", last_reset_seq); + } + + // Downmix to mono if stereo + let mono = if channels > 1 { + let num_frames = frame.len() / channels as usize; + let mut mono = Vec::with_capacity(num_frames); + for i in 0..num_frames { + mono.push(frame[i * channels as usize]); + } + mono + } else { + frame + }; + + was_active = true; + for pkt in decoder.process_samples(&mono) { + record_aprs_packet(pkt.clone()); + let _ = decode_tx.send(DecodedMessage::Aprs(pkt)); + } } - mono - } else { - frame - }; - - for pkt in decoder.process_samples(&mono) { - record_aprs_packet(pkt.clone()); - let _ = decode_tx.send(DecodedMessage::Aprs(pkt)); + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("APRS decoder: dropped {} PCM frames", n); + } + Err(broadcast::error::RecvError::Closed) => break, } } - Err(broadcast::error::RecvError::Lagged(n)) => { - warn!("APRS decoder: dropped {} PCM frames", n); + changed = state_rx.changed() => { + match changed { + Ok(()) => { + let state = state_rx.borrow(); + active = matches!(state.status.mode, RigMode::PKT); + if state.aprs_decode_reset_seq != last_reset_seq { + last_reset_seq = state.aprs_decode_reset_seq; + decoder.reset(); + info!("APRS decoder reset (seq={})", last_reset_seq); + } + if !active && was_active { + decoder.reset(); + was_active = false; + } + if active { + pcm_rx = pcm_rx.resubscribe(); + } + } + Err(_) => break, + } } - Err(broadcast::error::RecvError::Closed) => break, } } }