[fix](trx-server): gate cw decode strictly by mode
Co-authored-by: Codex <codex@openai.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
+68
-34
@@ -409,56 +409,90 @@ pub async fn run_cw_decoder(
|
||||
sample_rate: u32,
|
||||
channels: u16,
|
||||
mut pcm_rx: broadcast::Receiver<Vec<f32>>,
|
||||
state_rx: watch::Receiver<RigState>,
|
||||
mut state_rx: watch::Receiver<RigState>,
|
||||
decode_tx: broadcast::Sender<DecodedMessage>,
|
||||
) {
|
||||
info!("CW decoder started ({}Hz, {} ch)", sample_rate, channels);
|
||||
let mut decoder = decode::cw::CwDecoder::new(sample_rate);
|
||||
let mut was_active = false;
|
||||
let mut last_reset_seq: u64 = 0;
|
||||
let mut active = matches!(state_rx.borrow().status.mode, RigMode::CW | RigMode::CWR);
|
||||
|
||||
loop {
|
||||
match pcm_rx.recv().await {
|
||||
Ok(frame) => {
|
||||
let state = state_rx.borrow().clone();
|
||||
let active = true;
|
||||
|
||||
// Check for reset request
|
||||
if state.cw_decode_reset_seq != last_reset_seq {
|
||||
last_reset_seq = state.cw_decode_reset_seq;
|
||||
decoder.reset();
|
||||
info!("CW decoder reset (seq={})", last_reset_seq);
|
||||
}
|
||||
|
||||
if !active {
|
||||
if was_active {
|
||||
if !active {
|
||||
match state_rx.changed().await {
|
||||
Ok(()) => {
|
||||
let state = state_rx.borrow();
|
||||
active = matches!(state.status.mode, RigMode::CW | RigMode::CWR);
|
||||
if active {
|
||||
pcm_rx = pcm_rx.resubscribe();
|
||||
}
|
||||
if state.cw_decode_reset_seq != last_reset_seq {
|
||||
last_reset_seq = state.cw_decode_reset_seq;
|
||||
decoder.reset();
|
||||
was_active = false;
|
||||
info!("CW decoder reset (seq={})", last_reset_seq);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
was_active = true;
|
||||
Err(_) => break,
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Downmix to mono if stereo
|
||||
let mono = if channels > 1 {
|
||||
let num_frames = frame.len() / channels as usize;
|
||||
let mut mono = Vec::with_capacity(num_frames);
|
||||
for i in 0..num_frames {
|
||||
mono.push(frame[i * channels as usize]);
|
||||
tokio::select! {
|
||||
recv = pcm_rx.recv() => {
|
||||
match recv {
|
||||
Ok(frame) => {
|
||||
let state = state_rx.borrow();
|
||||
if state.cw_decode_reset_seq != last_reset_seq {
|
||||
last_reset_seq = state.cw_decode_reset_seq;
|
||||
decoder.reset();
|
||||
info!("CW decoder reset (seq={})", last_reset_seq);
|
||||
}
|
||||
|
||||
// Downmix to mono if stereo
|
||||
let mono = if channels > 1 {
|
||||
let num_frames = frame.len() / channels as usize;
|
||||
let mut mono = Vec::with_capacity(num_frames);
|
||||
for i in 0..num_frames {
|
||||
mono.push(frame[i * channels as usize]);
|
||||
}
|
||||
mono
|
||||
} else {
|
||||
frame
|
||||
};
|
||||
|
||||
was_active = true;
|
||||
for evt in decoder.process_samples(&mono) {
|
||||
let _ = decode_tx.send(DecodedMessage::Cw(evt));
|
||||
}
|
||||
}
|
||||
mono
|
||||
} else {
|
||||
frame
|
||||
};
|
||||
|
||||
for evt in decoder.process_samples(&mono) {
|
||||
let _ = decode_tx.send(DecodedMessage::Cw(evt));
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
warn!("CW decoder: dropped {} PCM frames", n);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
warn!("CW decoder: dropped {} PCM frames", n);
|
||||
changed = state_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) => {
|
||||
let state = state_rx.borrow();
|
||||
active = matches!(state.status.mode, RigMode::CW | RigMode::CWR);
|
||||
if state.cw_decode_reset_seq != last_reset_seq {
|
||||
last_reset_seq = state.cw_decode_reset_seq;
|
||||
decoder.reset();
|
||||
info!("CW decoder reset (seq={})", last_reset_seq);
|
||||
}
|
||||
if !active && was_active {
|
||||
decoder.reset();
|
||||
was_active = false;
|
||||
}
|
||||
if active {
|
||||
pcm_rx = pcm_rx.resubscribe();
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user