[feat](trx-server): serve GetSnapshot from watch channel in listener
Add fast path in the TCP listener to serve GetSnapshot requests directly from the state watch channel, so clients get a response even while the rig task is initializing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
@@ -12,14 +12,14 @@ use std::net::SocketAddr;
|
|||||||
|
|
||||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot, watch};
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
|
|
||||||
use trx_core::client::ClientEnvelope;
|
use trx_core::client::ClientEnvelope;
|
||||||
use trx_core::radio::freq::Freq;
|
use trx_core::radio::freq::Freq;
|
||||||
use trx_core::rig::command::RigCommand;
|
use trx_core::rig::command::RigCommand;
|
||||||
use trx_core::rig::request::RigRequest;
|
use trx_core::rig::request::RigRequest;
|
||||||
use trx_core::rig::state::RigMode;
|
use trx_core::rig::state::{RigMode, RigState};
|
||||||
use trx_core::{ClientCommand, ClientResponse};
|
use trx_core::{ClientCommand, ClientResponse};
|
||||||
|
|
||||||
/// Run the JSON TCP listener, accepting client connections.
|
/// Run the JSON TCP listener, accepting client connections.
|
||||||
@@ -27,6 +27,7 @@ pub async fn run_listener(
|
|||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
rig_tx: mpsc::Sender<RigRequest>,
|
rig_tx: mpsc::Sender<RigRequest>,
|
||||||
auth_tokens: HashSet<String>,
|
auth_tokens: HashSet<String>,
|
||||||
|
state_rx: watch::Receiver<RigState>,
|
||||||
) -> std::io::Result<()> {
|
) -> std::io::Result<()> {
|
||||||
let listener = TcpListener::bind(addr).await?;
|
let listener = TcpListener::bind(addr).await?;
|
||||||
info!("Listening on {}", addr);
|
info!("Listening on {}", addr);
|
||||||
@@ -37,8 +38,9 @@ pub async fn run_listener(
|
|||||||
|
|
||||||
let tx = rig_tx.clone();
|
let tx = rig_tx.clone();
|
||||||
let tokens = auth_tokens.clone();
|
let tokens = auth_tokens.clone();
|
||||||
|
let srx = state_rx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = handle_client(socket, peer, tx, &tokens).await {
|
if let Err(e) = handle_client(socket, peer, tx, &tokens, srx).await {
|
||||||
error!("Client {} error: {:?}", peer, e);
|
error!("Client {} error: {:?}", peer, e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -50,6 +52,7 @@ async fn handle_client(
|
|||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
tx: mpsc::Sender<RigRequest>,
|
tx: mpsc::Sender<RigRequest>,
|
||||||
auth_tokens: &HashSet<String>,
|
auth_tokens: &HashSet<String>,
|
||||||
|
state_rx: watch::Receiver<RigState>,
|
||||||
) -> std::io::Result<()> {
|
) -> std::io::Result<()> {
|
||||||
let (reader, mut writer) = socket.into_split();
|
let (reader, mut writer) = socket.into_split();
|
||||||
let mut reader = BufReader::new(reader);
|
let mut reader = BufReader::new(reader);
|
||||||
@@ -98,6 +101,23 @@ async fn handle_client(
|
|||||||
|
|
||||||
let rig_cmd = map_command(envelope.cmd);
|
let rig_cmd = map_command(envelope.cmd);
|
||||||
|
|
||||||
|
// Fast path: serve GetSnapshot directly from the watch channel
|
||||||
|
// so clients get a response even while the rig task is initializing.
|
||||||
|
if matches!(rig_cmd, RigCommand::GetSnapshot) {
|
||||||
|
let state = state_rx.borrow().clone();
|
||||||
|
if let Some(snapshot) = state.snapshot() {
|
||||||
|
let resp = ClientResponse {
|
||||||
|
success: true,
|
||||||
|
state: Some(snapshot),
|
||||||
|
error: None,
|
||||||
|
};
|
||||||
|
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||||
|
writer.write_all(resp_line.as_bytes()).await?;
|
||||||
|
writer.flush().await?;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let (resp_tx, resp_rx) = oneshot::channel();
|
let (resp_tx, resp_rx) = oneshot::channel();
|
||||||
let req = RigRequest {
|
let req = RigRequest {
|
||||||
cmd: rig_cmd,
|
cmd: rig_cmd,
|
||||||
|
|||||||
@@ -305,8 +305,9 @@ async fn main() -> DynResult<()> {
|
|||||||
.cloned()
|
.cloned()
|
||||||
.collect();
|
.collect();
|
||||||
let rig_tx = tx.clone();
|
let rig_tx = tx.clone();
|
||||||
|
let state_rx_listener = _state_rx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = listener::run_listener(listen_addr, rig_tx, auth_tokens).await {
|
if let Err(e) = listener::run_listener(listen_addr, rig_tx, auth_tokens, state_rx_listener).await {
|
||||||
error!("Listener error: {:?}", e);
|
error!("Listener error: {:?}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user