From c5320ca2fb02b78395037c8aa0502a3fa3d17375 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 26 Mar 2026 06:26:17 +0000 Subject: [PATCH] [refactor](trx-server): add AtomicUsize counter and recover from poisoned locks Add an AtomicUsize total_count field to DecoderHistories, maintained by record/prune/clear methods, so estimated_total_count() avoids 9 separate mutex acquisitions. Also replace audio ring buffer .unwrap() calls with .unwrap_or_else(|e| e.into_inner()) to recover from poisoned locks. https://claude.ai/code/session_01XzurkeuUmamBuhQwxVy7T4 Signed-off-by: Claude --- src/trx-server/src/audio.rs | 121 ++++++++++++++++++++++++++---------- 1 file changed, 89 insertions(+), 32 deletions(-) diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 829d5e5..187ed4d 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -6,7 +6,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -141,7 +141,7 @@ impl StreamErrorLogger { let mut state = self .state .lock() - .expect("stream error logger mutex poisoned"); + .unwrap_or_else(|e| e.into_inner()); // First occurrence or changed error class: log as error once. if state.last_kind != Some(kind) { @@ -210,6 +210,9 @@ pub struct DecoderHistories { pub ft4: Mutex>, pub ft2: Mutex>, pub wspr: Mutex>, + /// Approximate total entry count across all decoders, maintained + /// atomically so `estimated_total_count()` avoids 9 lock acquisitions. + total_count: AtomicUsize, } impl DecoderHistories { @@ -224,9 +227,21 @@ impl DecoderHistories { ft4: Mutex::new(VecDeque::new()), ft2: Mutex::new(VecDeque::new()), wspr: Mutex::new(VecDeque::new()), + total_count: AtomicUsize::new(0), }) } + /// Adjust the atomic total count after a record/prune/clear operation. + fn adjust_total_count(&self, old_len: usize, new_len: usize) { + if new_len > old_len { + self.total_count + .fetch_add(new_len - old_len, Ordering::Relaxed); + } else if old_len > new_len { + self.total_count + .fetch_sub(old_len - new_len, Ordering::Relaxed); + } + } + // --- AIS --- fn prune_ais(history: &mut VecDeque<(Instant, AisMessage)>) { @@ -245,13 +260,17 @@ impl DecoderHistories { msg.ts_ms = Some(current_timestamp_ms()); } let mut h = self.ais.lock().expect("ais history mutex poisoned"); + let before = h.len(); h.push_back((Instant::now(), msg)); Self::prune_ais(&mut h); + self.adjust_total_count(before, h.len()); } pub fn snapshot_ais_history(&self) -> Vec { let mut h = self.ais.lock().expect("ais history mutex poisoned"); + let before = h.len(); Self::prune_ais(&mut h); + self.adjust_total_count(before, h.len()); h.iter().map(|(_, msg)| msg.clone()).collect() } @@ -273,13 +292,17 @@ impl DecoderHistories { msg.ts_ms = Some(current_timestamp_ms()); } let mut h = self.vdes.lock().expect("vdes history mutex poisoned"); + let before = h.len(); h.push_back((Instant::now(), msg)); Self::prune_vdes(&mut h); + self.adjust_total_count(before, h.len()); } pub fn snapshot_vdes_history(&self) -> Vec { let mut h = self.vdes.lock().expect("vdes history mutex poisoned"); + let before = h.len(); Self::prune_vdes(&mut h); + self.adjust_total_count(before, h.len()); h.iter().map(|(_, msg)| msg.clone()).collect() } @@ -304,21 +327,25 @@ impl DecoderHistories { pkt.ts_ms = Some(current_timestamp_ms()); } let mut h = self.aprs.lock().expect("aprs history mutex poisoned"); + let before = h.len(); h.push_back((Instant::now(), pkt)); Self::prune_aprs(&mut h); + self.adjust_total_count(before, h.len()); } pub fn snapshot_aprs_history(&self) -> Vec { let mut h = self.aprs.lock().expect("aprs history mutex poisoned"); + let before = h.len(); Self::prune_aprs(&mut h); + self.adjust_total_count(before, h.len()); h.iter().map(|(_, pkt)| pkt.clone()).collect() } pub fn clear_aprs_history(&self) { - self.aprs - .lock() - .expect("aprs history mutex poisoned") - .clear(); + let mut h = self.aprs.lock().expect("aprs history mutex poisoned"); + let before = h.len(); + h.clear(); + self.adjust_total_count(before, 0); } // --- HF APRS --- @@ -342,21 +369,25 @@ impl DecoderHistories { pkt.ts_ms = Some(current_timestamp_ms()); } let mut h = self.hf_aprs.lock().expect("hf_aprs history mutex poisoned"); + let before = h.len(); h.push_back((Instant::now(), pkt)); Self::prune_hf_aprs(&mut h); + self.adjust_total_count(before, h.len()); } pub fn snapshot_hf_aprs_history(&self) -> Vec { let mut h = self.hf_aprs.lock().expect("hf_aprs history mutex poisoned"); + let before = h.len(); Self::prune_hf_aprs(&mut h); + self.adjust_total_count(before, h.len()); h.iter().map(|(_, pkt)| pkt.clone()).collect() } pub fn clear_hf_aprs_history(&self) { - self.hf_aprs - .lock() - .expect("hf_aprs history mutex poisoned") - .clear(); + let mut h = self.hf_aprs.lock().expect("hf_aprs history mutex poisoned"); + let before = h.len(); + h.clear(); + self.adjust_total_count(before, 0); } // --- CW --- @@ -374,18 +405,25 @@ impl DecoderHistories { pub fn record_cw_event(&self, evt: CwEvent) { let mut h = self.cw.lock().expect("cw history mutex poisoned"); + let before = h.len(); h.push_back((Instant::now(), evt)); Self::prune_cw(&mut h); + self.adjust_total_count(before, h.len()); } pub fn snapshot_cw_history(&self) -> Vec { let mut h = self.cw.lock().expect("cw history mutex poisoned"); + let before = h.len(); Self::prune_cw(&mut h); + self.adjust_total_count(before, h.len()); h.iter().map(|(_, evt)| evt.clone()).collect() } pub fn clear_cw_history(&self) { - self.cw.lock().expect("cw history mutex poisoned").clear(); + let mut h = self.cw.lock().expect("cw history mutex poisoned"); + let before = h.len(); + h.clear(); + self.adjust_total_count(before, 0); } // --- FT8 --- @@ -403,18 +441,25 @@ impl DecoderHistories { pub fn record_ft8_message(&self, msg: Ft8Message) { let mut h = self.ft8.lock().expect("ft8 history mutex poisoned"); + let before = h.len(); h.push_back((Instant::now(), msg)); Self::prune_ft8(&mut h); + self.adjust_total_count(before, h.len()); } pub fn snapshot_ft8_history(&self) -> Vec { let mut h = self.ft8.lock().expect("ft8 history mutex poisoned"); + let before = h.len(); Self::prune_ft8(&mut h); + self.adjust_total_count(before, h.len()); h.iter().map(|(_, msg)| msg.clone()).collect() } pub fn clear_ft8_history(&self) { - self.ft8.lock().expect("ft8 history mutex poisoned").clear(); + let mut h = self.ft8.lock().expect("ft8 history mutex poisoned"); + let before = h.len(); + h.clear(); + self.adjust_total_count(before, 0); } // --- FT4 --- @@ -432,18 +477,25 @@ impl DecoderHistories { pub fn record_ft4_message(&self, msg: Ft8Message) { let mut h = self.ft4.lock().expect("ft4 history mutex poisoned"); + let before = h.len(); h.push_back((Instant::now(), msg)); Self::prune_ft4(&mut h); + self.adjust_total_count(before, h.len()); } pub fn snapshot_ft4_history(&self) -> Vec { let mut h = self.ft4.lock().expect("ft4 history mutex poisoned"); + let before = h.len(); Self::prune_ft4(&mut h); + self.adjust_total_count(before, h.len()); h.iter().map(|(_, msg)| msg.clone()).collect() } pub fn clear_ft4_history(&self) { - self.ft4.lock().expect("ft4 history mutex poisoned").clear(); + let mut h = self.ft4.lock().expect("ft4 history mutex poisoned"); + let before = h.len(); + h.clear(); + self.adjust_total_count(before, 0); } // --- FT2 --- @@ -461,18 +513,25 @@ impl DecoderHistories { pub fn record_ft2_message(&self, msg: Ft8Message) { let mut h = self.ft2.lock().expect("ft2 history mutex poisoned"); + let before = h.len(); h.push_back((Instant::now(), msg)); Self::prune_ft2(&mut h); + self.adjust_total_count(before, h.len()); } pub fn snapshot_ft2_history(&self) -> Vec { let mut h = self.ft2.lock().expect("ft2 history mutex poisoned"); + let before = h.len(); Self::prune_ft2(&mut h); + self.adjust_total_count(before, h.len()); h.iter().map(|(_, msg)| msg.clone()).collect() } pub fn clear_ft2_history(&self) { - self.ft2.lock().expect("ft2 history mutex poisoned").clear(); + let mut h = self.ft2.lock().expect("ft2 history mutex poisoned"); + let before = h.len(); + h.clear(); + self.adjust_total_count(before, 0); } // --- WSPR --- @@ -490,36 +549,34 @@ impl DecoderHistories { pub fn record_wspr_message(&self, msg: WsprMessage) { let mut h = self.wspr.lock().expect("wspr history mutex poisoned"); + let before = h.len(); h.push_back((Instant::now(), msg)); Self::prune_wspr(&mut h); + self.adjust_total_count(before, h.len()); } pub fn snapshot_wspr_history(&self) -> Vec { let mut h = self.wspr.lock().expect("wspr history mutex poisoned"); + let before = h.len(); Self::prune_wspr(&mut h); + self.adjust_total_count(before, h.len()); h.iter().map(|(_, msg)| msg.clone()).collect() } pub fn clear_wspr_history(&self) { - self.wspr - .lock() - .expect("wspr history mutex poisoned") - .clear(); + let mut h = self.wspr.lock().expect("wspr history mutex poisoned"); + let before = h.len(); + h.clear(); + self.adjust_total_count(before, 0); } /// Returns a quick (non-pruning) estimate of the total number of history /// entries across all decoders, used for pre-allocating the replay blob. + /// + /// Uses an `AtomicUsize` counter maintained by record/prune/clear methods, + /// avoiding 9 separate mutex acquisitions. pub fn estimated_total_count(&self) -> usize { - let ais = self.ais.lock().map(|h| h.len()).unwrap_or(0); - let vdes = self.vdes.lock().map(|h| h.len()).unwrap_or(0); - let aprs = self.aprs.lock().map(|h| h.len()).unwrap_or(0); - let hf_aprs = self.hf_aprs.lock().map(|h| h.len()).unwrap_or(0); - let cw = self.cw.lock().map(|h| h.len()).unwrap_or(0); - let ft8 = self.ft8.lock().map(|h| h.len()).unwrap_or(0); - let ft4 = self.ft4.lock().map(|h| h.len()).unwrap_or(0); - let ft2 = self.ft2.lock().map(|h| h.len()).unwrap_or(0); - let wspr = self.wspr.lock().map(|h| h.len()).unwrap_or(0); - ais + vdes + aprs + hf_aprs + cw + ft8 + ft4 + ft2 + wspr + self.total_count.load(Ordering::Relaxed) } } @@ -878,7 +935,7 @@ fn run_playback( { let ring = ring.clone(); move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { - let mut ring = ring.lock().unwrap(); + let mut ring = ring.lock().unwrap_or_else(|e| e.into_inner()); for sample in data.iter_mut() { *sample = ring.pop_front().unwrap_or(0.0); } @@ -951,7 +1008,7 @@ fn run_playback( match decoder.decode_float(&packet, &mut pcm_buf, false) { Ok(decoded) => { - let mut ring = ring_writer.lock().unwrap(); + let mut ring = ring_writer.lock().unwrap_or_else(|e| e.into_inner()); ring.extend(&pcm_buf[..decoded * channels as usize]); } Err(e) => { @@ -966,7 +1023,7 @@ fn run_playback( if rx.is_empty() { let _ = stream.pause(); playing = false; - ring_writer.lock().unwrap().clear(); + ring_writer.lock().unwrap_or_else(|e| e.into_inner()).clear(); info!("Audio playback: paused (idle)"); if channel_closed { return Ok(()); @@ -994,7 +1051,7 @@ fn run_playback( let _ = stream.pause(); playing = false; } - ring_writer.lock().unwrap().clear(); + ring_writer.lock().unwrap_or_else(|e| e.into_inner()).clear(); if channel_closed { return Ok(());