[refactor](trx-server): add AtomicUsize counter and recover from poisoned locks

Add an AtomicUsize total_count field to DecoderHistories, maintained by
record/prune/clear methods, so estimated_total_count() avoids 9 separate
mutex acquisitions. Also replace audio ring buffer .unwrap() calls with
.unwrap_or_else(|e| e.into_inner()) to recover from poisoned locks.

https://claude.ai/code/session_01XzurkeuUmamBuhQwxVy7T4
Signed-off-by: Claude <noreply@anthropic.com>
This commit is contained in:
Claude
2026-03-26 06:26:17 +00:00
committed by Stan Grams
parent d42c803f91
commit c5320ca2fb
+89 -32
View File
@@ -6,7 +6,7 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
@@ -141,7 +141,7 @@ impl StreamErrorLogger {
let mut state = self
.state
.lock()
.expect("stream error logger mutex poisoned");
.unwrap_or_else(|e| e.into_inner());
// First occurrence or changed error class: log as error once.
if state.last_kind != Some(kind) {
@@ -210,6 +210,9 @@ pub struct DecoderHistories {
pub ft4: Mutex<VecDeque<(Instant, Ft8Message)>>,
pub ft2: Mutex<VecDeque<(Instant, Ft8Message)>>,
pub wspr: Mutex<VecDeque<(Instant, WsprMessage)>>,
/// Approximate total entry count across all decoders, maintained
/// atomically so `estimated_total_count()` avoids 9 lock acquisitions.
total_count: AtomicUsize,
}
impl DecoderHistories {
@@ -224,9 +227,21 @@ impl DecoderHistories {
ft4: Mutex::new(VecDeque::new()),
ft2: Mutex::new(VecDeque::new()),
wspr: Mutex::new(VecDeque::new()),
total_count: AtomicUsize::new(0),
})
}
/// Adjust the atomic total count after a record/prune/clear operation.
fn adjust_total_count(&self, old_len: usize, new_len: usize) {
if new_len > old_len {
self.total_count
.fetch_add(new_len - old_len, Ordering::Relaxed);
} else if old_len > new_len {
self.total_count
.fetch_sub(old_len - new_len, Ordering::Relaxed);
}
}
// --- AIS ---
fn prune_ais(history: &mut VecDeque<(Instant, AisMessage)>) {
@@ -245,13 +260,17 @@ impl DecoderHistories {
msg.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.ais.lock().expect("ais history mutex poisoned");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_ais(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_ais_history(&self) -> Vec<AisMessage> {
let mut h = self.ais.lock().expect("ais history mutex poisoned");
let before = h.len();
Self::prune_ais(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, msg)| msg.clone()).collect()
}
@@ -273,13 +292,17 @@ impl DecoderHistories {
msg.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.vdes.lock().expect("vdes history mutex poisoned");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_vdes(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_vdes_history(&self) -> Vec<VdesMessage> {
let mut h = self.vdes.lock().expect("vdes history mutex poisoned");
let before = h.len();
Self::prune_vdes(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, msg)| msg.clone()).collect()
}
@@ -304,21 +327,25 @@ impl DecoderHistories {
pkt.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.aprs.lock().expect("aprs history mutex poisoned");
let before = h.len();
h.push_back((Instant::now(), pkt));
Self::prune_aprs(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_aprs_history(&self) -> Vec<AprsPacket> {
let mut h = self.aprs.lock().expect("aprs history mutex poisoned");
let before = h.len();
Self::prune_aprs(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, pkt)| pkt.clone()).collect()
}
pub fn clear_aprs_history(&self) {
self.aprs
.lock()
.expect("aprs history mutex poisoned")
.clear();
let mut h = self.aprs.lock().expect("aprs history mutex poisoned");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
}
// --- HF APRS ---
@@ -342,21 +369,25 @@ impl DecoderHistories {
pkt.ts_ms = Some(current_timestamp_ms());
}
let mut h = self.hf_aprs.lock().expect("hf_aprs history mutex poisoned");
let before = h.len();
h.push_back((Instant::now(), pkt));
Self::prune_hf_aprs(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_hf_aprs_history(&self) -> Vec<AprsPacket> {
let mut h = self.hf_aprs.lock().expect("hf_aprs history mutex poisoned");
let before = h.len();
Self::prune_hf_aprs(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, pkt)| pkt.clone()).collect()
}
pub fn clear_hf_aprs_history(&self) {
self.hf_aprs
.lock()
.expect("hf_aprs history mutex poisoned")
.clear();
let mut h = self.hf_aprs.lock().expect("hf_aprs history mutex poisoned");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
}
// --- CW ---
@@ -374,18 +405,25 @@ impl DecoderHistories {
pub fn record_cw_event(&self, evt: CwEvent) {
let mut h = self.cw.lock().expect("cw history mutex poisoned");
let before = h.len();
h.push_back((Instant::now(), evt));
Self::prune_cw(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_cw_history(&self) -> Vec<CwEvent> {
let mut h = self.cw.lock().expect("cw history mutex poisoned");
let before = h.len();
Self::prune_cw(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, evt)| evt.clone()).collect()
}
pub fn clear_cw_history(&self) {
self.cw.lock().expect("cw history mutex poisoned").clear();
let mut h = self.cw.lock().expect("cw history mutex poisoned");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
}
// --- FT8 ---
@@ -403,18 +441,25 @@ impl DecoderHistories {
pub fn record_ft8_message(&self, msg: Ft8Message) {
let mut h = self.ft8.lock().expect("ft8 history mutex poisoned");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_ft8(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_ft8_history(&self) -> Vec<Ft8Message> {
let mut h = self.ft8.lock().expect("ft8 history mutex poisoned");
let before = h.len();
Self::prune_ft8(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, msg)| msg.clone()).collect()
}
pub fn clear_ft8_history(&self) {
self.ft8.lock().expect("ft8 history mutex poisoned").clear();
let mut h = self.ft8.lock().expect("ft8 history mutex poisoned");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
}
// --- FT4 ---
@@ -432,18 +477,25 @@ impl DecoderHistories {
pub fn record_ft4_message(&self, msg: Ft8Message) {
let mut h = self.ft4.lock().expect("ft4 history mutex poisoned");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_ft4(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_ft4_history(&self) -> Vec<Ft8Message> {
let mut h = self.ft4.lock().expect("ft4 history mutex poisoned");
let before = h.len();
Self::prune_ft4(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, msg)| msg.clone()).collect()
}
pub fn clear_ft4_history(&self) {
self.ft4.lock().expect("ft4 history mutex poisoned").clear();
let mut h = self.ft4.lock().expect("ft4 history mutex poisoned");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
}
// --- FT2 ---
@@ -461,18 +513,25 @@ impl DecoderHistories {
pub fn record_ft2_message(&self, msg: Ft8Message) {
let mut h = self.ft2.lock().expect("ft2 history mutex poisoned");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_ft2(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_ft2_history(&self) -> Vec<Ft8Message> {
let mut h = self.ft2.lock().expect("ft2 history mutex poisoned");
let before = h.len();
Self::prune_ft2(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, msg)| msg.clone()).collect()
}
pub fn clear_ft2_history(&self) {
self.ft2.lock().expect("ft2 history mutex poisoned").clear();
let mut h = self.ft2.lock().expect("ft2 history mutex poisoned");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
}
// --- WSPR ---
@@ -490,36 +549,34 @@ impl DecoderHistories {
pub fn record_wspr_message(&self, msg: WsprMessage) {
let mut h = self.wspr.lock().expect("wspr history mutex poisoned");
let before = h.len();
h.push_back((Instant::now(), msg));
Self::prune_wspr(&mut h);
self.adjust_total_count(before, h.len());
}
pub fn snapshot_wspr_history(&self) -> Vec<WsprMessage> {
let mut h = self.wspr.lock().expect("wspr history mutex poisoned");
let before = h.len();
Self::prune_wspr(&mut h);
self.adjust_total_count(before, h.len());
h.iter().map(|(_, msg)| msg.clone()).collect()
}
pub fn clear_wspr_history(&self) {
self.wspr
.lock()
.expect("wspr history mutex poisoned")
.clear();
let mut h = self.wspr.lock().expect("wspr history mutex poisoned");
let before = h.len();
h.clear();
self.adjust_total_count(before, 0);
}
/// Returns a quick (non-pruning) estimate of the total number of history
/// entries across all decoders, used for pre-allocating the replay blob.
///
/// Uses an `AtomicUsize` counter maintained by record/prune/clear methods,
/// avoiding 9 separate mutex acquisitions.
pub fn estimated_total_count(&self) -> usize {
let ais = self.ais.lock().map(|h| h.len()).unwrap_or(0);
let vdes = self.vdes.lock().map(|h| h.len()).unwrap_or(0);
let aprs = self.aprs.lock().map(|h| h.len()).unwrap_or(0);
let hf_aprs = self.hf_aprs.lock().map(|h| h.len()).unwrap_or(0);
let cw = self.cw.lock().map(|h| h.len()).unwrap_or(0);
let ft8 = self.ft8.lock().map(|h| h.len()).unwrap_or(0);
let ft4 = self.ft4.lock().map(|h| h.len()).unwrap_or(0);
let ft2 = self.ft2.lock().map(|h| h.len()).unwrap_or(0);
let wspr = self.wspr.lock().map(|h| h.len()).unwrap_or(0);
ais + vdes + aprs + hf_aprs + cw + ft8 + ft4 + ft2 + wspr
self.total_count.load(Ordering::Relaxed)
}
}
@@ -878,7 +935,7 @@ fn run_playback(
{
let ring = ring.clone();
move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
let mut ring = ring.lock().unwrap();
let mut ring = ring.lock().unwrap_or_else(|e| e.into_inner());
for sample in data.iter_mut() {
*sample = ring.pop_front().unwrap_or(0.0);
}
@@ -951,7 +1008,7 @@ fn run_playback(
match decoder.decode_float(&packet, &mut pcm_buf, false) {
Ok(decoded) => {
let mut ring = ring_writer.lock().unwrap();
let mut ring = ring_writer.lock().unwrap_or_else(|e| e.into_inner());
ring.extend(&pcm_buf[..decoded * channels as usize]);
}
Err(e) => {
@@ -966,7 +1023,7 @@ fn run_playback(
if rx.is_empty() {
let _ = stream.pause();
playing = false;
ring_writer.lock().unwrap().clear();
ring_writer.lock().unwrap_or_else(|e| e.into_inner()).clear();
info!("Audio playback: paused (idle)");
if channel_closed {
return Ok(());
@@ -994,7 +1051,7 @@ fn run_playback(
let _ = stream.pause();
playing = false;
}
ring_writer.lock().unwrap().clear();
ring_writer.lock().unwrap_or_else(|e| e.into_inner()).clear();
if channel_closed {
return Ok(());