[fix](trx-frontend-http): recover from poisoned locks and document ordering
Replace all .unwrap() on RwLock/Mutex acquisitions with .unwrap_or_else(|e| e.into_inner()) to gracefully recover from poisoned locks instead of panicking. Add lock ordering documentation to the module header to prevent deadlocks. https://claude.ai/code/session_01XzurkeuUmamBuhQwxVy7T4 Signed-off-by: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -14,6 +14,20 @@
|
|||||||
//! registry is the source of truth for metadata (freq/mode per channel) and
|
//! registry is the source of truth for metadata (freq/mode per channel) and
|
||||||
//! drives `SetFreq`/`SetMode` commands to the server when a tab selects or
|
//! drives `SetFreq`/`SetMode` commands to the server when a tab selects or
|
||||||
//! tunes a channel.
|
//! tunes a channel.
|
||||||
|
//!
|
||||||
|
//! # Lock ordering
|
||||||
|
//!
|
||||||
|
//! [`ClientChannelManager`] owns several synchronisation primitives. To
|
||||||
|
//! prevent deadlocks, all code in this module acquires them in the following
|
||||||
|
//! fixed order:
|
||||||
|
//!
|
||||||
|
//! 1. `rigs` (RwLock) — primary channel data
|
||||||
|
//! 2. `sessions` (RwLock) — session-to-channel mapping
|
||||||
|
//! 3. `audio_cmd` / `rig_vchan_audio_cmd` (Mutex / RwLock) — fire-and-forget command senders
|
||||||
|
//!
|
||||||
|
//! **Rule**: always `drop(rigs)` before acquiring `sessions` or `audio_cmd`.
|
||||||
|
//! The command senders are independent of the first two and may be acquired
|
||||||
|
//! at any point provided no higher-priority lock is held.
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
@@ -132,7 +146,7 @@ impl ClientChannelManager {
|
|||||||
|
|
||||||
/// Wire the global audio-command sender as fallback.
|
/// Wire the global audio-command sender as fallback.
|
||||||
pub fn set_audio_cmd(&self, tx: mpsc::Sender<VChanAudioCmd>) {
|
pub fn set_audio_cmd(&self, tx: mpsc::Sender<VChanAudioCmd>) {
|
||||||
*self.audio_cmd.lock().unwrap() = Some(tx);
|
*self.audio_cmd.lock().unwrap_or_else(|e| e.into_inner()) = Some(tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fire-and-forget: send a `VChanAudioCmd`, routing to the per-rig sender
|
/// Fire-and-forget: send a `VChanAudioCmd`, routing to the per-rig sender
|
||||||
@@ -146,7 +160,7 @@ impl ClientChannelManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Fall back to global sender.
|
// Fall back to global sender.
|
||||||
if let Some(tx) = self.audio_cmd.lock().unwrap().as_ref() {
|
if let Some(tx) = self.audio_cmd.lock().unwrap_or_else(|e| e.into_inner()).as_ref() {
|
||||||
let _ = tx.try_send(cmd);
|
let _ = tx.try_send(cmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -177,7 +191,7 @@ impl ClientChannelManager {
|
|||||||
/// Ensure channel 0 exists for `rig_id`. Call this when the SSE stream
|
/// Ensure channel 0 exists for `rig_id`. Call this when the SSE stream
|
||||||
/// first delivers rig state so the primary channel reflects the current freq.
|
/// first delivers rig state so the primary channel reflects the current freq.
|
||||||
pub fn init_rig(&self, rig_id: &str, freq_hz: u64, mode: &str) {
|
pub fn init_rig(&self, rig_id: &str, freq_hz: u64, mode: &str) {
|
||||||
let mut rigs = self.rigs.write().unwrap();
|
let mut rigs = self.rigs.write().unwrap_or_else(|e| e.into_inner());
|
||||||
let channels = rigs.entry(rig_id.to_string()).or_default();
|
let channels = rigs.entry(rig_id.to_string()).or_default();
|
||||||
if channels.is_empty() {
|
if channels.is_empty() {
|
||||||
channels.push(InternalChannel {
|
channels.push(InternalChannel {
|
||||||
@@ -195,7 +209,7 @@ impl ClientChannelManager {
|
|||||||
|
|
||||||
/// Update channel 0's freq/mode when the server pushes a new rig state.
|
/// Update channel 0's freq/mode when the server pushes a new rig state.
|
||||||
pub fn update_primary(&self, rig_id: &str, freq_hz: u64, mode: &str) {
|
pub fn update_primary(&self, rig_id: &str, freq_hz: u64, mode: &str) {
|
||||||
let mut rigs = self.rigs.write().unwrap();
|
let mut rigs = self.rigs.write().unwrap_or_else(|e| e.into_inner());
|
||||||
if let Some(channels) = rigs.get_mut(rig_id) {
|
if let Some(channels) = rigs.get_mut(rig_id) {
|
||||||
if let Some(ch) = channels.first_mut() {
|
if let Some(ch) = channels.first_mut() {
|
||||||
if ch.freq_hz != freq_hz || ch.mode != mode {
|
if ch.freq_hz != freq_hz || ch.mode != mode {
|
||||||
@@ -209,7 +223,7 @@ impl ClientChannelManager {
|
|||||||
|
|
||||||
/// List all channels for a rig (returns empty vec if rig unknown).
|
/// List all channels for a rig (returns empty vec if rig unknown).
|
||||||
pub fn channels(&self, rig_id: &str) -> Vec<ClientChannel> {
|
pub fn channels(&self, rig_id: &str) -> Vec<ClientChannel> {
|
||||||
let rigs = self.rigs.read().unwrap();
|
let rigs = self.rigs.read().unwrap_or_else(|e| e.into_inner());
|
||||||
rigs.get(rig_id)
|
rigs.get(rig_id)
|
||||||
.map(|chs| {
|
.map(|chs| {
|
||||||
chs.iter()
|
chs.iter()
|
||||||
@@ -238,7 +252,7 @@ impl ClientChannelManager {
|
|||||||
freq_hz: u64,
|
freq_hz: u64,
|
||||||
mode: &str,
|
mode: &str,
|
||||||
) -> Result<ClientChannel, VChanClientError> {
|
) -> Result<ClientChannel, VChanClientError> {
|
||||||
let mut rigs = self.rigs.write().unwrap();
|
let mut rigs = self.rigs.write().unwrap_or_else(|e| e.into_inner());
|
||||||
let channels = rigs.entry(rig_id.to_string()).or_default();
|
let channels = rigs.entry(rig_id.to_string()).or_default();
|
||||||
|
|
||||||
if channels.len() >= self.max_channels {
|
if channels.len() >= self.max_channels {
|
||||||
@@ -276,7 +290,7 @@ impl ClientChannelManager {
|
|||||||
drop(rigs);
|
drop(rigs);
|
||||||
self.sessions
|
self.sessions
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap_or_else(|e| e.into_inner())
|
||||||
.insert(session_id, (rig_id.to_string(), id));
|
.insert(session_id, (rig_id.to_string(), id));
|
||||||
|
|
||||||
// Request server-side DSP channel + audio subscription.
|
// Request server-side DSP channel + audio subscription.
|
||||||
@@ -305,7 +319,7 @@ impl ClientChannelManager {
|
|||||||
// Release previous subscription on this rig.
|
// Release previous subscription on this rig.
|
||||||
self.release_session_on_rig(session_id, rig_id);
|
self.release_session_on_rig(session_id, rig_id);
|
||||||
|
|
||||||
let mut rigs = self.rigs.write().unwrap();
|
let mut rigs = self.rigs.write().unwrap_or_else(|e| e.into_inner());
|
||||||
let channels = rigs.get_mut(rig_id)?;
|
let channels = rigs.get_mut(rig_id)?;
|
||||||
let (idx, ch) = channels
|
let (idx, ch) = channels
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
@@ -330,7 +344,7 @@ impl ClientChannelManager {
|
|||||||
drop(rigs);
|
drop(rigs);
|
||||||
self.sessions
|
self.sessions
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap_or_else(|e| e.into_inner())
|
||||||
.insert(session_id, (rig_id.to_string(), channel_id));
|
.insert(session_id, (rig_id.to_string(), channel_id));
|
||||||
|
|
||||||
Some(snapshot)
|
Some(snapshot)
|
||||||
@@ -340,7 +354,7 @@ impl ClientChannelManager {
|
|||||||
/// Auto-removes non-permanent channels that reach 0 subscribers.
|
/// Auto-removes non-permanent channels that reach 0 subscribers.
|
||||||
pub fn release_session(&self, session_id: Uuid) {
|
pub fn release_session(&self, session_id: Uuid) {
|
||||||
let mapping = {
|
let mapping = {
|
||||||
let mut sessions = self.sessions.write().unwrap();
|
let mut sessions = self.sessions.write().unwrap_or_else(|e| e.into_inner());
|
||||||
sessions.remove(&session_id)
|
sessions.remove(&session_id)
|
||||||
};
|
};
|
||||||
if let Some((rig_id, _)) = mapping {
|
if let Some((rig_id, _)) = mapping {
|
||||||
@@ -349,7 +363,7 @@ impl ClientChannelManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn release_session_on_rig(&self, session_id: Uuid, rig_id: &str) {
|
fn release_session_on_rig(&self, session_id: Uuid, rig_id: &str) {
|
||||||
let mut rigs = self.rigs.write().unwrap();
|
let mut rigs = self.rigs.write().unwrap_or_else(|e| e.into_inner());
|
||||||
let Some(channels) = rigs.get_mut(rig_id) else {
|
let Some(channels) = rigs.get_mut(rig_id) else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
@@ -386,7 +400,7 @@ impl ClientChannelManager {
|
|||||||
|
|
||||||
/// Explicitly delete a channel by UUID (any session may do this).
|
/// Explicitly delete a channel by UUID (any session may do this).
|
||||||
pub fn delete_channel(&self, rig_id: &str, channel_id: Uuid) -> Result<(), VChanClientError> {
|
pub fn delete_channel(&self, rig_id: &str, channel_id: Uuid) -> Result<(), VChanClientError> {
|
||||||
let mut rigs = self.rigs.write().unwrap();
|
let mut rigs = self.rigs.write().unwrap_or_else(|e| e.into_inner());
|
||||||
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
|
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
|
||||||
let pos = channels
|
let pos = channels
|
||||||
.iter()
|
.iter()
|
||||||
@@ -401,7 +415,7 @@ impl ClientChannelManager {
|
|||||||
self.broadcast_change(rig_id, channels);
|
self.broadcast_change(rig_id, channels);
|
||||||
drop(rigs);
|
drop(rigs);
|
||||||
|
|
||||||
let mut sessions = self.sessions.write().unwrap();
|
let mut sessions = self.sessions.write().unwrap_or_else(|e| e.into_inner());
|
||||||
for sid in evicted {
|
for sid in evicted {
|
||||||
sessions.remove(&sid);
|
sessions.remove(&sid);
|
||||||
}
|
}
|
||||||
@@ -419,7 +433,7 @@ impl ClientChannelManager {
|
|||||||
let evicted_sessions: Vec<Uuid>;
|
let evicted_sessions: Vec<Uuid>;
|
||||||
let rig_id_opt: Option<String>;
|
let rig_id_opt: Option<String>;
|
||||||
{
|
{
|
||||||
let mut rigs = self.rigs.write().unwrap();
|
let mut rigs = self.rigs.write().unwrap_or_else(|e| e.into_inner());
|
||||||
let mut found = false;
|
let mut found = false;
|
||||||
let mut evicted = Vec::new();
|
let mut evicted = Vec::new();
|
||||||
let mut found_rig = None;
|
let mut found_rig = None;
|
||||||
@@ -439,7 +453,7 @@ impl ClientChannelManager {
|
|||||||
}
|
}
|
||||||
// Clean up session → channel mapping for sessions that were subscribed.
|
// Clean up session → channel mapping for sessions that were subscribed.
|
||||||
if rig_id_opt.is_some() {
|
if rig_id_opt.is_some() {
|
||||||
let mut sessions = self.sessions.write().unwrap();
|
let mut sessions = self.sessions.write().unwrap_or_else(|e| e.into_inner());
|
||||||
for sid in evicted_sessions {
|
for sid in evicted_sessions {
|
||||||
if matches!(sessions.get(&sid), Some((_, ch)) if *ch == channel_id) {
|
if matches!(sessions.get(&sid), Some((_, ch)) if *ch == channel_id) {
|
||||||
sessions.remove(&sid);
|
sessions.remove(&sid);
|
||||||
@@ -455,7 +469,7 @@ impl ClientChannelManager {
|
|||||||
channel_id: Uuid,
|
channel_id: Uuid,
|
||||||
freq_hz: u64,
|
freq_hz: u64,
|
||||||
) -> Result<(), VChanClientError> {
|
) -> Result<(), VChanClientError> {
|
||||||
let mut rigs = self.rigs.write().unwrap();
|
let mut rigs = self.rigs.write().unwrap_or_else(|e| e.into_inner());
|
||||||
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
|
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
|
||||||
let ch = channels
|
let ch = channels
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
@@ -480,7 +494,7 @@ impl ClientChannelManager {
|
|||||||
channel_id: Uuid,
|
channel_id: Uuid,
|
||||||
mode: &str,
|
mode: &str,
|
||||||
) -> Result<(), VChanClientError> {
|
) -> Result<(), VChanClientError> {
|
||||||
let mut rigs = self.rigs.write().unwrap();
|
let mut rigs = self.rigs.write().unwrap_or_else(|e| e.into_inner());
|
||||||
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
|
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
|
||||||
let ch = channels
|
let ch = channels
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
@@ -505,7 +519,7 @@ impl ClientChannelManager {
|
|||||||
channel_id: Uuid,
|
channel_id: Uuid,
|
||||||
bandwidth_hz: u32,
|
bandwidth_hz: u32,
|
||||||
) -> Result<(), VChanClientError> {
|
) -> Result<(), VChanClientError> {
|
||||||
let mut rigs = self.rigs.write().unwrap();
|
let mut rigs = self.rigs.write().unwrap_or_else(|e| e.into_inner());
|
||||||
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
|
let channels = rigs.get_mut(rig_id).ok_or(VChanClientError::NotFound)?;
|
||||||
let ch = channels
|
let ch = channels
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
@@ -526,12 +540,12 @@ impl ClientChannelManager {
|
|||||||
|
|
||||||
/// Return the channel a session is currently subscribed to.
|
/// Return the channel a session is currently subscribed to.
|
||||||
pub fn session_channel(&self, session_id: Uuid) -> Option<(String, Uuid)> {
|
pub fn session_channel(&self, session_id: Uuid) -> Option<(String, Uuid)> {
|
||||||
self.sessions.read().unwrap().get(&session_id).cloned()
|
self.sessions.read().unwrap_or_else(|e| e.into_inner()).get(&session_id).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the selected channel's tune metadata.
|
/// Return the selected channel's tune metadata.
|
||||||
pub fn selected_channel(&self, rig_id: &str, channel_id: Uuid) -> Option<SelectedChannel> {
|
pub fn selected_channel(&self, rig_id: &str, channel_id: Uuid) -> Option<SelectedChannel> {
|
||||||
let rigs = self.rigs.read().unwrap();
|
let rigs = self.rigs.read().unwrap_or_else(|e| e.into_inner());
|
||||||
let channels = rigs.get(rig_id)?;
|
let channels = rigs.get(rig_id)?;
|
||||||
let channel = channels.iter().find(|channel| channel.id == channel_id)?;
|
let channel = channels.iter().find(|channel| channel.id == channel_id)?;
|
||||||
Some(SelectedChannel {
|
Some(SelectedChannel {
|
||||||
@@ -554,7 +568,7 @@ impl ClientChannelManager {
|
|||||||
rig_id: &str,
|
rig_id: &str,
|
||||||
desired: &[(String, u64, String, u32, Vec<String>)],
|
desired: &[(String, u64, String, u32, Vec<String>)],
|
||||||
) {
|
) {
|
||||||
let mut rigs = self.rigs.write().unwrap();
|
let mut rigs = self.rigs.write().unwrap_or_else(|e| e.into_inner());
|
||||||
let Some(channels) = rigs.get_mut(rig_id) else {
|
let Some(channels) = rigs.get_mut(rig_id) else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user