[chore](trx-server): remove temporary hang tracing
Co-authored-by: OpenAI Codex <codex@openai.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -35,8 +35,6 @@ use crate::rig_handle::RigHandle;
|
||||
const IO_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
|
||||
const MAX_JSON_LINE_BYTES: usize = 16 * 1024;
|
||||
const LISTENER_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Run the JSON TCP listener, accepting client connections.
|
||||
///
|
||||
/// `rigs` is a shared map from rig_id → `RigHandle`. The first entry (by
|
||||
@@ -51,8 +49,6 @@ pub async fn run_listener(
|
||||
) -> std::io::Result<()> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
info!("Listening on {}", addr);
|
||||
let mut heartbeat = time::interval(LISTENER_HEARTBEAT_INTERVAL);
|
||||
|
||||
let validator = Arc::new(SimpleTokenValidator::new(auth_tokens));
|
||||
|
||||
loop {
|
||||
@@ -71,14 +67,6 @@ pub async fn run_listener(
|
||||
}
|
||||
});
|
||||
}
|
||||
_ = heartbeat.tick() => {
|
||||
info!(
|
||||
"Listener heartbeat: addr={}, rigs={}, default_rig_id={}",
|
||||
addr,
|
||||
rigs.len(),
|
||||
default_rig_id
|
||||
);
|
||||
}
|
||||
changed = shutdown_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) if *shutdown_rx.borrow() => {
|
||||
@@ -288,18 +276,6 @@ async fn handle_client(
|
||||
};
|
||||
|
||||
let rig_cmd = mapping::client_command_to_rig(envelope.cmd);
|
||||
let rig_cmd_label = format!("{:?}", rig_cmd);
|
||||
let log_request = !matches!(rig_cmd, RigCommand::GetSpectrum | RigCommand::GetSnapshot);
|
||||
let request_started = time::Instant::now();
|
||||
if log_request {
|
||||
info!(
|
||||
"Client {} request start: rig_id='{}' cmd={}",
|
||||
addr,
|
||||
target_rig_id,
|
||||
rig_cmd_label
|
||||
);
|
||||
}
|
||||
|
||||
// Fast path: serve GetSnapshot directly from the watch channel
|
||||
// so clients get a response even while the rig task is initializing.
|
||||
if matches!(rig_cmd, RigCommand::GetSnapshot) {
|
||||
@@ -383,15 +359,6 @@ async fn handle_client(
|
||||
}
|
||||
} {
|
||||
Ok(Ok(snapshot)) => {
|
||||
if log_request {
|
||||
info!(
|
||||
"Client {} request ok: rig_id='{}' cmd={} elapsed={:?}",
|
||||
addr,
|
||||
target_rig_id,
|
||||
rig_cmd_label,
|
||||
request_started.elapsed()
|
||||
);
|
||||
}
|
||||
let resp = ClientResponse {
|
||||
success: true,
|
||||
rig_id: Some(target_rig_id.clone()),
|
||||
@@ -402,16 +369,6 @@ async fn handle_client(
|
||||
send_response(&mut writer, &resp).await?;
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
if log_request {
|
||||
warn!(
|
||||
"Client {} request rig error: rig_id='{}' cmd={} elapsed={:?}: {}",
|
||||
addr,
|
||||
target_rig_id,
|
||||
rig_cmd_label,
|
||||
request_started.elapsed(),
|
||||
err.message
|
||||
);
|
||||
}
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
rig_id: Some(target_rig_id.clone()),
|
||||
@@ -422,16 +379,6 @@ async fn handle_client(
|
||||
send_response(&mut writer, &resp).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
if log_request {
|
||||
error!(
|
||||
"Client {} request response channel error: rig_id='{}' cmd={} elapsed={:?}: {:?}",
|
||||
addr,
|
||||
target_rig_id,
|
||||
rig_cmd_label,
|
||||
request_started.elapsed(),
|
||||
e
|
||||
);
|
||||
}
|
||||
error!("Rig response oneshot recv error: {:?}", e);
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
|
||||
@@ -29,8 +29,6 @@ use crate::error::is_invalid_bcd_error;
|
||||
|
||||
const POLL_REFRESH_TIMEOUT: Duration = Duration::from_secs(8);
|
||||
const COMMAND_EXEC_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const RIG_TASK_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Configuration for the rig task.
|
||||
pub struct RigTaskConfig {
|
||||
pub registry: Arc<RegistrationContext>,
|
||||
@@ -238,7 +236,6 @@ pub async fn run_rig_task(
|
||||
let mut current_poll_duration = polling.interval(state.status.tx_en);
|
||||
let mut poll_sleep: std::pin::Pin<Box<tokio::time::Sleep>> =
|
||||
Box::pin(tokio::time::sleep(current_poll_duration));
|
||||
let mut heartbeat = time::interval(RIG_TASK_HEARTBEAT_INTERVAL);
|
||||
loop {
|
||||
// Update sleep duration if tx_en state changed
|
||||
let new_duration = polling.interval(state.status.tx_en);
|
||||
@@ -260,7 +257,6 @@ pub async fn run_rig_task(
|
||||
}
|
||||
_ = &mut poll_sleep => {
|
||||
poll_sleep = Box::pin(tokio::time::sleep(current_poll_duration));
|
||||
let poll_started = Instant::now();
|
||||
// Check if polling is paused
|
||||
if let Some(until) = poll_pause_until {
|
||||
if Instant::now() < until {
|
||||
@@ -284,19 +280,9 @@ pub async fn run_rig_task(
|
||||
.await
|
||||
{
|
||||
Ok(Ok(())) => {
|
||||
info!(
|
||||
"[{}] rig poll refresh ok in {:?}; syncing state",
|
||||
config.rig_id,
|
||||
poll_started.elapsed()
|
||||
);
|
||||
let old_machine_state = machine.state().clone();
|
||||
sync_machine_state(&mut machine, &state);
|
||||
let new_machine_state = machine.state().clone();
|
||||
info!(
|
||||
"[{}] rig poll state sync done in {:?}; emitting changes",
|
||||
config.rig_id,
|
||||
poll_started.elapsed()
|
||||
);
|
||||
emit_state_changes(
|
||||
&emitter,
|
||||
&old_state,
|
||||
@@ -304,17 +290,7 @@ pub async fn run_rig_task(
|
||||
&old_machine_state,
|
||||
&new_machine_state,
|
||||
);
|
||||
info!(
|
||||
"[{}] rig poll emit done in {:?}; publishing watch state",
|
||||
config.rig_id,
|
||||
poll_started.elapsed()
|
||||
);
|
||||
let _ = state_tx.send(state.clone());
|
||||
info!(
|
||||
"[{}] rig poll publish done in {:?}",
|
||||
config.rig_id,
|
||||
poll_started.elapsed()
|
||||
);
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
error!("CAT polling error: {:?}", e);
|
||||
@@ -334,20 +310,6 @@ pub async fn run_rig_task(
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = heartbeat.tick() => {
|
||||
info!(
|
||||
"[{}] rig_task heartbeat: initialized={}, freq_hz={}, mode={:?}, tx_en={}, poll_paused={}",
|
||||
config.rig_id,
|
||||
state.initialized,
|
||||
state.status.freq.hz,
|
||||
state.status.mode,
|
||||
state.status.tx_en,
|
||||
poll_pause_until
|
||||
.map(|until| Instant::now() < until)
|
||||
.unwrap_or(false)
|
||||
);
|
||||
}
|
||||
|
||||
maybe_req = rx.recv() => {
|
||||
let Some(first_req) = maybe_req else { break; };
|
||||
|
||||
@@ -356,12 +318,6 @@ pub async fn run_rig_task(
|
||||
while let Ok(next) = rx.try_recv() {
|
||||
batch.push(next);
|
||||
}
|
||||
info!(
|
||||
"[{}] rig request batch received: size={}, first_cmd={:?}",
|
||||
config.rig_id,
|
||||
batch.len(),
|
||||
batch.last().map(|req| &req.cmd)
|
||||
);
|
||||
|
||||
// Process each request
|
||||
while let Some(RigRequest { cmd, respond_to, .. }) = batch.pop() {
|
||||
@@ -416,9 +372,6 @@ pub async fn run_rig_task(
|
||||
let cmd_label = format!("{:?}", cmd);
|
||||
let log_command = !matches!(&cmd, RigCommand::GetSpectrum);
|
||||
let started = Instant::now();
|
||||
if log_command {
|
||||
info!("[{}] rig command start: {}", config.rig_id, cmd_label);
|
||||
}
|
||||
|
||||
let mut cmd_ctx = CommandExecContext {
|
||||
rig: &mut rig,
|
||||
@@ -448,15 +401,7 @@ pub async fn run_rig_task(
|
||||
}
|
||||
};
|
||||
|
||||
let response_status = if result.is_ok() { "ok" } else { "err" };
|
||||
let _ = respond_to.send(result);
|
||||
info!(
|
||||
"[{}] rig command response sent: {} status={} elapsed={:?}",
|
||||
config.rig_id,
|
||||
cmd_label,
|
||||
response_status,
|
||||
started.elapsed()
|
||||
);
|
||||
|
||||
if log_command {
|
||||
let elapsed = started.elapsed();
|
||||
@@ -808,13 +753,6 @@ async fn refresh_state_with_retry(
|
||||
|
||||
/// Read current state from the rig via CAT.
|
||||
async fn refresh_state_from_cat(rig: &mut Box<dyn RigCat>, state: &mut RigState) -> DynResult<()> {
|
||||
let started = std::time::Instant::now();
|
||||
info!(
|
||||
"CAT refresh start: freq_hz={}, mode={:?}, tx_en={}",
|
||||
state.status.freq.hz,
|
||||
state.status.mode,
|
||||
state.status.tx_en
|
||||
);
|
||||
let (freq, mode, vfo) = rig.get_status().await?;
|
||||
state.filter = rig.filter_state();
|
||||
state.control.enabled = Some(true);
|
||||
@@ -858,13 +796,6 @@ async fn refresh_state_from_cat(rig: &mut Box<dyn RigCat>, state: &mut RigState)
|
||||
}
|
||||
|
||||
state.status.lock = Some(state.control.lock.unwrap_or(false));
|
||||
info!(
|
||||
"CAT refresh done in {:?}: freq_hz={}, mode={:?}, tx_en={}",
|
||||
started.elapsed(),
|
||||
state.status.freq.hz,
|
||||
state.status.mode,
|
||||
state.status.tx_en
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use num_complex::Complex;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::warn;
|
||||
use trx_core::rig::state::RigMode;
|
||||
|
||||
pub use self::channel::{ChannelDsp, VirtualSquelchConfig};
|
||||
@@ -288,39 +289,12 @@ fn iq_read_loop(
|
||||
let mut zero_read_streak: u32 = 0;
|
||||
let mut overflow_log_window_start: Option<Instant> = None;
|
||||
let mut overflow_log_suppressed: u32 = 0;
|
||||
let mut retune_seq: u64 = 0;
|
||||
let mut last_applied_retune_hz: Option<f64> = None;
|
||||
|
||||
loop {
|
||||
// Apply any pending hardware retune before the next read.
|
||||
if let Ok(mut cmd) = retune_cmd.try_lock() {
|
||||
if let Some(hz) = cmd.take() {
|
||||
retune_seq = retune_seq.wrapping_add(1);
|
||||
let started = Instant::now();
|
||||
tracing::warn!(
|
||||
"SDR retune request #{} starting: target={:.0} Hz, last_applied={}",
|
||||
retune_seq,
|
||||
hz,
|
||||
last_applied_retune_hz
|
||||
.map(|value| format!("{value:.0} Hz"))
|
||||
.unwrap_or_else(|| "none".to_string())
|
||||
);
|
||||
if let Err(e) = source.set_center_freq(hz) {
|
||||
tracing::warn!(
|
||||
"SDR retune request #{} failed after {:?}: target={:.0} Hz: {}",
|
||||
retune_seq,
|
||||
started.elapsed(),
|
||||
hz,
|
||||
e
|
||||
);
|
||||
} else {
|
||||
last_applied_retune_hz = Some(hz);
|
||||
tracing::info!(
|
||||
"SDR retune request #{} applied in {:?}: {:.0} Hz",
|
||||
retune_seq,
|
||||
started.elapsed(),
|
||||
hz
|
||||
);
|
||||
warn!("set_center_freq failed ({}): {}", hz, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,8 +10,6 @@ pub mod vchan_impl;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
|
||||
use trx_core::radio::freq::{Band, Freq};
|
||||
use trx_core::rig::response::RigError;
|
||||
use trx_core::rig::state::{RigFilterState, SpectrumData, VchanRdsEntry, WfmDenoiseLevel};
|
||||
@@ -452,13 +450,6 @@ impl RigCat for SoapySdrRig {
|
||||
freq: Freq,
|
||||
) -> Pin<Box<dyn std::future::Future<Output = DynResult<()>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
let started = Instant::now();
|
||||
tracing::info!(
|
||||
"SoapySdrRig::set_freq start: target={} Hz, current_center={} Hz, mode={:?}",
|
||||
freq.hz,
|
||||
self.center_hz,
|
||||
self.mode
|
||||
);
|
||||
tracing::debug!("SoapySdrRig: set_freq -> {} Hz", freq.hz);
|
||||
let freq_changed = self.freq.hz != freq.hz;
|
||||
self.freq = freq;
|
||||
@@ -496,12 +487,6 @@ impl RigCat for SoapySdrRig {
|
||||
}
|
||||
}
|
||||
self.update_ais_channel_offsets();
|
||||
tracing::info!(
|
||||
"SoapySdrRig::set_freq done in {:?}: dial={} Hz, center={} Hz",
|
||||
started.elapsed(),
|
||||
self.freq.hz,
|
||||
self.center_hz
|
||||
);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
@@ -511,12 +496,6 @@ impl RigCat for SoapySdrRig {
|
||||
freq: Freq,
|
||||
) -> Pin<Box<dyn std::future::Future<Output = DynResult<()>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
let started = Instant::now();
|
||||
tracing::info!(
|
||||
"SoapySdrRig::set_center_freq start: target={} Hz, current_center={} Hz",
|
||||
freq.hz,
|
||||
self.center_hz
|
||||
);
|
||||
tracing::debug!("SoapySdrRig: set_center_freq -> {} Hz", freq.hz);
|
||||
self.center_hz = freq.hz as i64;
|
||||
if let Ok(mut cmd) = self.retune_cmd.lock() {
|
||||
@@ -531,11 +510,6 @@ impl RigCat for SoapySdrRig {
|
||||
}
|
||||
}
|
||||
self.update_ais_channel_offsets();
|
||||
tracing::info!(
|
||||
"SoapySdrRig::set_center_freq done in {:?}: center={} Hz",
|
||||
started.elapsed(),
|
||||
self.center_hz
|
||||
);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
@@ -545,12 +519,6 @@ impl RigCat for SoapySdrRig {
|
||||
mode: RigMode,
|
||||
) -> Pin<Box<dyn std::future::Future<Output = DynResult<()>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
let started = Instant::now();
|
||||
tracing::info!(
|
||||
"SoapySdrRig::set_mode start: target={:?}, current={:?}",
|
||||
mode,
|
||||
self.mode
|
||||
);
|
||||
tracing::debug!("SoapySdrRig: set_mode -> {:?}", mode);
|
||||
self.mode = mode.clone();
|
||||
self.bandwidth_hz = Self::default_bandwidth_for_mode(&mode);
|
||||
@@ -565,12 +533,6 @@ impl RigCat for SoapySdrRig {
|
||||
}
|
||||
self.apply_ais_channel_activity();
|
||||
self.apply_ais_channel_filters();
|
||||
tracing::info!(
|
||||
"SoapySdrRig::set_mode done in {:?}: mode={:?}, bandwidth_hz={}",
|
||||
started.elapsed(),
|
||||
self.mode,
|
||||
self.bandwidth_hz
|
||||
);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user