[fix](trx-server): gate aprs decode strictly by mode
Co-authored-by: Codex <codex@openai.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
+69
-36
@@ -315,58 +315,91 @@ pub async fn run_aprs_decoder(
|
|||||||
sample_rate: u32,
|
sample_rate: u32,
|
||||||
channels: u16,
|
channels: u16,
|
||||||
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
|
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
|
||||||
state_rx: watch::Receiver<RigState>,
|
mut state_rx: watch::Receiver<RigState>,
|
||||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||||
) {
|
) {
|
||||||
info!("APRS decoder started ({}Hz, {} ch)", sample_rate, channels);
|
info!("APRS decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||||
let mut decoder = decode::aprs::AprsDecoder::new(sample_rate);
|
let mut decoder = decode::aprs::AprsDecoder::new(sample_rate);
|
||||||
let mut was_active = false;
|
let mut was_active = false;
|
||||||
let mut last_reset_seq: u64 = 0;
|
let mut last_reset_seq: u64 = 0;
|
||||||
|
let mut active = matches!(state_rx.borrow().status.mode, RigMode::PKT);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match pcm_rx.recv().await {
|
if !active {
|
||||||
Ok(frame) => {
|
match state_rx.changed().await {
|
||||||
let state = state_rx.borrow().clone();
|
Ok(()) => {
|
||||||
let mode = &state.status.mode;
|
let state = state_rx.borrow();
|
||||||
let active = matches!(mode, RigMode::PKT);
|
active = matches!(state.status.mode, RigMode::PKT);
|
||||||
|
if active {
|
||||||
// Check for reset request
|
pcm_rx = pcm_rx.resubscribe();
|
||||||
if state.aprs_decode_reset_seq != last_reset_seq {
|
}
|
||||||
last_reset_seq = state.aprs_decode_reset_seq;
|
if state.aprs_decode_reset_seq != last_reset_seq {
|
||||||
decoder.reset();
|
last_reset_seq = state.aprs_decode_reset_seq;
|
||||||
info!("APRS decoder reset (seq={})", last_reset_seq);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !active {
|
|
||||||
if was_active {
|
|
||||||
decoder.reset();
|
decoder.reset();
|
||||||
was_active = false;
|
info!("APRS decoder reset (seq={})", last_reset_seq);
|
||||||
}
|
}
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
was_active = true;
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// Downmix to mono if stereo
|
tokio::select! {
|
||||||
let mono = if channels > 1 {
|
recv = pcm_rx.recv() => {
|
||||||
let num_frames = frame.len() / channels as usize;
|
match recv {
|
||||||
let mut mono = Vec::with_capacity(num_frames);
|
Ok(frame) => {
|
||||||
for i in 0..num_frames {
|
let state = state_rx.borrow();
|
||||||
mono.push(frame[i * channels as usize]);
|
if state.aprs_decode_reset_seq != last_reset_seq {
|
||||||
|
last_reset_seq = state.aprs_decode_reset_seq;
|
||||||
|
decoder.reset();
|
||||||
|
info!("APRS decoder reset (seq={})", last_reset_seq);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Downmix to mono if stereo
|
||||||
|
let mono = if channels > 1 {
|
||||||
|
let num_frames = frame.len() / channels as usize;
|
||||||
|
let mut mono = Vec::with_capacity(num_frames);
|
||||||
|
for i in 0..num_frames {
|
||||||
|
mono.push(frame[i * channels as usize]);
|
||||||
|
}
|
||||||
|
mono
|
||||||
|
} else {
|
||||||
|
frame
|
||||||
|
};
|
||||||
|
|
||||||
|
was_active = true;
|
||||||
|
for pkt in decoder.process_samples(&mono) {
|
||||||
|
record_aprs_packet(pkt.clone());
|
||||||
|
let _ = decode_tx.send(DecodedMessage::Aprs(pkt));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
mono
|
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||||
} else {
|
warn!("APRS decoder: dropped {} PCM frames", n);
|
||||||
frame
|
}
|
||||||
};
|
Err(broadcast::error::RecvError::Closed) => break,
|
||||||
|
|
||||||
for pkt in decoder.process_samples(&mono) {
|
|
||||||
record_aprs_packet(pkt.clone());
|
|
||||||
let _ = decode_tx.send(DecodedMessage::Aprs(pkt));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
changed = state_rx.changed() => {
|
||||||
warn!("APRS decoder: dropped {} PCM frames", n);
|
match changed {
|
||||||
|
Ok(()) => {
|
||||||
|
let state = state_rx.borrow();
|
||||||
|
active = matches!(state.status.mode, RigMode::PKT);
|
||||||
|
if state.aprs_decode_reset_seq != last_reset_seq {
|
||||||
|
last_reset_seq = state.aprs_decode_reset_seq;
|
||||||
|
decoder.reset();
|
||||||
|
info!("APRS decoder reset (seq={})", last_reset_seq);
|
||||||
|
}
|
||||||
|
if !active && was_active {
|
||||||
|
decoder.reset();
|
||||||
|
was_active = false;
|
||||||
|
}
|
||||||
|
if active {
|
||||||
|
pcm_rx = pcm_rx.resubscribe();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(broadcast::error::RecvError::Closed) => break,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user