[debug](trx-server): trace listener and rig task activity
Co-authored-by: OpenAI Codex <codex@openai.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -35,6 +35,7 @@ use crate::rig_handle::RigHandle;
|
|||||||
const IO_TIMEOUT: Duration = Duration::from_secs(10);
|
const IO_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
const REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
|
const REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
|
||||||
const MAX_JSON_LINE_BYTES: usize = 16 * 1024;
|
const MAX_JSON_LINE_BYTES: usize = 16 * 1024;
|
||||||
|
const LISTENER_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
/// Run the JSON TCP listener, accepting client connections.
|
/// Run the JSON TCP listener, accepting client connections.
|
||||||
///
|
///
|
||||||
@@ -50,6 +51,7 @@ pub async fn run_listener(
|
|||||||
) -> std::io::Result<()> {
|
) -> std::io::Result<()> {
|
||||||
let listener = TcpListener::bind(addr).await?;
|
let listener = TcpListener::bind(addr).await?;
|
||||||
info!("Listening on {}", addr);
|
info!("Listening on {}", addr);
|
||||||
|
let mut heartbeat = time::interval(LISTENER_HEARTBEAT_INTERVAL);
|
||||||
|
|
||||||
let validator = Arc::new(SimpleTokenValidator::new(auth_tokens));
|
let validator = Arc::new(SimpleTokenValidator::new(auth_tokens));
|
||||||
|
|
||||||
@@ -69,6 +71,14 @@ pub async fn run_listener(
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
_ = heartbeat.tick() => {
|
||||||
|
info!(
|
||||||
|
"Listener heartbeat: addr={}, rigs={}, default_rig_id={}",
|
||||||
|
addr,
|
||||||
|
rigs.len(),
|
||||||
|
default_rig_id
|
||||||
|
);
|
||||||
|
}
|
||||||
changed = shutdown_rx.changed() => {
|
changed = shutdown_rx.changed() => {
|
||||||
match changed {
|
match changed {
|
||||||
Ok(()) if *shutdown_rx.borrow() => {
|
Ok(()) if *shutdown_rx.borrow() => {
|
||||||
@@ -278,6 +288,17 @@ async fn handle_client(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let rig_cmd = mapping::client_command_to_rig(envelope.cmd);
|
let rig_cmd = mapping::client_command_to_rig(envelope.cmd);
|
||||||
|
let rig_cmd_label = format!("{:?}", rig_cmd);
|
||||||
|
let log_request = !matches!(rig_cmd, RigCommand::GetSpectrum | RigCommand::GetSnapshot);
|
||||||
|
let request_started = time::Instant::now();
|
||||||
|
if log_request {
|
||||||
|
info!(
|
||||||
|
"Client {} request start: rig_id='{}' cmd={}",
|
||||||
|
addr,
|
||||||
|
target_rig_id,
|
||||||
|
rig_cmd_label
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Fast path: serve GetSnapshot directly from the watch channel
|
// Fast path: serve GetSnapshot directly from the watch channel
|
||||||
// so clients get a response even while the rig task is initializing.
|
// so clients get a response even while the rig task is initializing.
|
||||||
@@ -362,6 +383,15 @@ async fn handle_client(
|
|||||||
}
|
}
|
||||||
} {
|
} {
|
||||||
Ok(Ok(snapshot)) => {
|
Ok(Ok(snapshot)) => {
|
||||||
|
if log_request {
|
||||||
|
info!(
|
||||||
|
"Client {} request ok: rig_id='{}' cmd={} elapsed={:?}",
|
||||||
|
addr,
|
||||||
|
target_rig_id,
|
||||||
|
rig_cmd_label,
|
||||||
|
request_started.elapsed()
|
||||||
|
);
|
||||||
|
}
|
||||||
let resp = ClientResponse {
|
let resp = ClientResponse {
|
||||||
success: true,
|
success: true,
|
||||||
rig_id: Some(target_rig_id.clone()),
|
rig_id: Some(target_rig_id.clone()),
|
||||||
@@ -372,6 +402,16 @@ async fn handle_client(
|
|||||||
send_response(&mut writer, &resp).await?;
|
send_response(&mut writer, &resp).await?;
|
||||||
}
|
}
|
||||||
Ok(Err(err)) => {
|
Ok(Err(err)) => {
|
||||||
|
if log_request {
|
||||||
|
warn!(
|
||||||
|
"Client {} request rig error: rig_id='{}' cmd={} elapsed={:?}: {}",
|
||||||
|
addr,
|
||||||
|
target_rig_id,
|
||||||
|
rig_cmd_label,
|
||||||
|
request_started.elapsed(),
|
||||||
|
err.message
|
||||||
|
);
|
||||||
|
}
|
||||||
let resp = ClientResponse {
|
let resp = ClientResponse {
|
||||||
success: false,
|
success: false,
|
||||||
rig_id: Some(target_rig_id.clone()),
|
rig_id: Some(target_rig_id.clone()),
|
||||||
@@ -382,6 +422,16 @@ async fn handle_client(
|
|||||||
send_response(&mut writer, &resp).await?;
|
send_response(&mut writer, &resp).await?;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
if log_request {
|
||||||
|
error!(
|
||||||
|
"Client {} request response channel error: rig_id='{}' cmd={} elapsed={:?}: {:?}",
|
||||||
|
addr,
|
||||||
|
target_rig_id,
|
||||||
|
rig_cmd_label,
|
||||||
|
request_started.elapsed(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
error!("Rig response oneshot recv error: {:?}", e);
|
error!("Rig response oneshot recv error: {:?}", e);
|
||||||
let resp = ClientResponse {
|
let resp = ClientResponse {
|
||||||
success: false,
|
success: false,
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ use crate::error::is_invalid_bcd_error;
|
|||||||
|
|
||||||
const POLL_REFRESH_TIMEOUT: Duration = Duration::from_secs(8);
|
const POLL_REFRESH_TIMEOUT: Duration = Duration::from_secs(8);
|
||||||
const COMMAND_EXEC_TIMEOUT: Duration = Duration::from_secs(10);
|
const COMMAND_EXEC_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
const RIG_TASK_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
/// Configuration for the rig task.
|
/// Configuration for the rig task.
|
||||||
pub struct RigTaskConfig {
|
pub struct RigTaskConfig {
|
||||||
@@ -237,6 +238,7 @@ pub async fn run_rig_task(
|
|||||||
let mut current_poll_duration = polling.interval(state.status.tx_en);
|
let mut current_poll_duration = polling.interval(state.status.tx_en);
|
||||||
let mut poll_sleep: std::pin::Pin<Box<tokio::time::Sleep>> =
|
let mut poll_sleep: std::pin::Pin<Box<tokio::time::Sleep>> =
|
||||||
Box::pin(tokio::time::sleep(current_poll_duration));
|
Box::pin(tokio::time::sleep(current_poll_duration));
|
||||||
|
let mut heartbeat = time::interval(RIG_TASK_HEARTBEAT_INTERVAL);
|
||||||
loop {
|
loop {
|
||||||
// Update sleep duration if tx_en state changed
|
// Update sleep duration if tx_en state changed
|
||||||
let new_duration = polling.interval(state.status.tx_en);
|
let new_duration = polling.interval(state.status.tx_en);
|
||||||
@@ -311,6 +313,19 @@ pub async fn run_rig_task(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
_ = heartbeat.tick() => {
|
||||||
|
info!(
|
||||||
|
"[{}] rig_task heartbeat: initialized={}, freq_hz={}, mode={:?}, tx_en={}, poll_paused={}",
|
||||||
|
config.rig_id,
|
||||||
|
state.initialized,
|
||||||
|
state.status.freq.hz,
|
||||||
|
state.status.mode,
|
||||||
|
state.status.tx_en,
|
||||||
|
poll_pause_until
|
||||||
|
.map(|until| Instant::now() < until)
|
||||||
|
.unwrap_or(false)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
maybe_req = rx.recv() => {
|
maybe_req = rx.recv() => {
|
||||||
let Some(first_req) = maybe_req else { break; };
|
let Some(first_req) = maybe_req else { break; };
|
||||||
@@ -374,6 +389,9 @@ pub async fn run_rig_task(
|
|||||||
let cmd_label = format!("{:?}", cmd);
|
let cmd_label = format!("{:?}", cmd);
|
||||||
let log_command = !matches!(&cmd, RigCommand::GetSpectrum);
|
let log_command = !matches!(&cmd, RigCommand::GetSpectrum);
|
||||||
let started = Instant::now();
|
let started = Instant::now();
|
||||||
|
if log_command {
|
||||||
|
info!("[{}] rig command start: {}", config.rig_id, cmd_label);
|
||||||
|
}
|
||||||
|
|
||||||
let mut cmd_ctx = CommandExecContext {
|
let mut cmd_ctx = CommandExecContext {
|
||||||
rig: &mut rig,
|
rig: &mut rig,
|
||||||
|
|||||||
Reference in New Issue
Block a user