[fix](trx-client): harden json tcp parsing and io limits
Add typed remote endpoint parsing (including bracketed IPv6), bounded JSON line reads, and read/write/request timeouts across client/server JSON-TCP paths. Co-authored-by: OpenAI Codex <codex@openai.com> Signed-off-by: Stanislaw Grams <stanislawgrams@gmail.com>
This commit is contained in:
+133
-37
@@ -10,10 +10,12 @@
|
||||
use std::collections::HashSet;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::{mpsc, oneshot, watch};
|
||||
use tokio::time;
|
||||
use tracing::{error, info};
|
||||
|
||||
use trx_core::rig::command::RigCommand;
|
||||
@@ -24,6 +26,10 @@ use trx_protocol::codec::parse_envelope;
|
||||
use trx_protocol::mapping;
|
||||
use trx_protocol::ClientResponse;
|
||||
|
||||
const IO_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
|
||||
const MAX_JSON_LINE_BYTES: usize = 16 * 1024;
|
||||
|
||||
/// Run the JSON TCP listener, accepting client connections.
|
||||
pub async fn run_listener(
|
||||
addr: SocketAddr,
|
||||
@@ -68,6 +74,76 @@ pub async fn run_listener(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_limited_line<R: AsyncBufRead + Unpin>(
|
||||
reader: &mut R,
|
||||
max_bytes: usize,
|
||||
) -> std::io::Result<Option<String>> {
|
||||
let mut line = Vec::with_capacity(256);
|
||||
loop {
|
||||
let available = reader.fill_buf().await?;
|
||||
if available.is_empty() {
|
||||
if line.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
let text = String::from_utf8(line).map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("line is not valid UTF-8: {e}"),
|
||||
)
|
||||
})?;
|
||||
return Ok(Some(text));
|
||||
}
|
||||
|
||||
if let Some(pos) = available.iter().position(|b| *b == b'\n') {
|
||||
let chunk = &available[..=pos];
|
||||
if line.len() + chunk.len() > max_bytes {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("line exceeds maximum size of {max_bytes} bytes"),
|
||||
));
|
||||
}
|
||||
line.extend_from_slice(chunk);
|
||||
reader.consume(pos + 1);
|
||||
let text = String::from_utf8(line).map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("line is not valid UTF-8: {e}"),
|
||||
)
|
||||
})?;
|
||||
return Ok(Some(text));
|
||||
}
|
||||
|
||||
if line.len() + available.len() > max_bytes {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("line exceeds maximum size of {max_bytes} bytes"),
|
||||
));
|
||||
}
|
||||
|
||||
line.extend_from_slice(available);
|
||||
let consumed = available.len();
|
||||
reader.consume(consumed);
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_response(
|
||||
writer: &mut tokio::net::tcp::OwnedWriteHalf,
|
||||
response: &ClientResponse,
|
||||
) -> std::io::Result<()> {
|
||||
let resp_line = serde_json::to_string(response).map_err(std::io::Error::other)? + "\n";
|
||||
time::timeout(IO_TIMEOUT, writer.write_all(resp_line.as_bytes()))
|
||||
.await
|
||||
.map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::TimedOut, "response write timeout")
|
||||
})??;
|
||||
time::timeout(IO_TIMEOUT, writer.flush())
|
||||
.await
|
||||
.map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::TimedOut, "response flush timeout")
|
||||
})??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_client(
|
||||
socket: TcpStream,
|
||||
addr: SocketAddr,
|
||||
@@ -78,12 +154,21 @@ async fn handle_client(
|
||||
) -> std::io::Result<()> {
|
||||
let (reader, mut writer) = socket.into_split();
|
||||
let mut reader = BufReader::new(reader);
|
||||
let mut line = String::new();
|
||||
|
||||
loop {
|
||||
line.clear();
|
||||
let bytes_read = tokio::select! {
|
||||
read = reader.read_line(&mut line) => read?,
|
||||
let line = tokio::select! {
|
||||
read = time::timeout(IO_TIMEOUT, read_limited_line(&mut reader, MAX_JSON_LINE_BYTES)) => {
|
||||
match read {
|
||||
Ok(Ok(line)) => line,
|
||||
Ok(Err(e)) => return Err(e),
|
||||
Err(_) => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"read timeout waiting for client request",
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
changed = shutdown_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) if *shutdown_rx.borrow() => {
|
||||
@@ -95,10 +180,10 @@ async fn handle_client(
|
||||
}
|
||||
}
|
||||
};
|
||||
if bytes_read == 0 {
|
||||
let Some(line) = line else {
|
||||
info!("Client {} disconnected", addr);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
@@ -114,9 +199,7 @@ async fn handle_client(
|
||||
state: None,
|
||||
error: Some(format!("Invalid JSON: {}", e)),
|
||||
};
|
||||
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||
writer.write_all(resp_line.as_bytes()).await?;
|
||||
writer.flush().await?;
|
||||
send_response(&mut writer, &resp).await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -127,9 +210,7 @@ async fn handle_client(
|
||||
state: None,
|
||||
error: Some(err),
|
||||
};
|
||||
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||
writer.write_all(resp_line.as_bytes()).await?;
|
||||
writer.flush().await?;
|
||||
send_response(&mut writer, &resp).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -145,9 +226,7 @@ async fn handle_client(
|
||||
state: Some(snapshot),
|
||||
error: None,
|
||||
};
|
||||
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||
writer.write_all(resp_line.as_bytes()).await?;
|
||||
writer.flush().await?;
|
||||
send_response(&mut writer, &resp).await?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -158,21 +237,44 @@ async fn handle_client(
|
||||
respond_to: resp_tx,
|
||||
};
|
||||
|
||||
if let Err(e) = tx.send(req).await {
|
||||
error!("Failed to send request to rig_task: {:?}", e);
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
state: None,
|
||||
error: Some("Internal error: rig task not available".into()),
|
||||
};
|
||||
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||
writer.write_all(resp_line.as_bytes()).await?;
|
||||
writer.flush().await?;
|
||||
continue;
|
||||
match time::timeout(IO_TIMEOUT, tx.send(req)).await {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(e)) => {
|
||||
error!("Failed to send request to rig_task: {:?}", e);
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
state: None,
|
||||
error: Some("Internal error: rig task not available".into()),
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
continue;
|
||||
}
|
||||
Err(_) => {
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
state: None,
|
||||
error: Some("Internal error: request queue timeout".into()),
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
match tokio::select! {
|
||||
result = resp_rx => result,
|
||||
result = time::timeout(REQUEST_TIMEOUT, resp_rx) => {
|
||||
match result {
|
||||
Ok(inner) => inner,
|
||||
Err(_) => {
|
||||
let resp = ClientResponse {
|
||||
success: false,
|
||||
state: None,
|
||||
error: Some("Request timed out waiting for rig response".into()),
|
||||
};
|
||||
send_response(&mut writer, &resp).await?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
changed = shutdown_rx.changed() => {
|
||||
match changed {
|
||||
Ok(()) if *shutdown_rx.borrow() => {
|
||||
@@ -190,9 +292,7 @@ async fn handle_client(
|
||||
state: Some(snapshot),
|
||||
error: None,
|
||||
};
|
||||
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||
writer.write_all(resp_line.as_bytes()).await?;
|
||||
writer.flush().await?;
|
||||
send_response(&mut writer, &resp).await?;
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
let resp = ClientResponse {
|
||||
@@ -200,9 +300,7 @@ async fn handle_client(
|
||||
state: None,
|
||||
error: Some(err.message),
|
||||
};
|
||||
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||
writer.write_all(resp_line.as_bytes()).await?;
|
||||
writer.flush().await?;
|
||||
send_response(&mut writer, &resp).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Rig response oneshot recv error: {:?}", e);
|
||||
@@ -211,9 +309,7 @@ async fn handle_client(
|
||||
state: None,
|
||||
error: Some("Internal error waiting for rig response".into()),
|
||||
};
|
||||
let resp_line = serde_json::to_string(&resp)? + "\n";
|
||||
writer.write_all(resp_line.as_bytes()).await?;
|
||||
writer.flush().await?;
|
||||
send_response(&mut writer, &resp).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user