[refactor](trx-server): de-ignore listener tests via duplex transport

Split handle_client into a generic handle_client_io<R, W> core plus a thin TcpStream wrapper, and make send_response generic over AsyncWrite. The 8 ignored integration tests now drive handle_client_io directly over a tokio::io::duplex pair, so the per-connection protocol state machine is exercised on every cargo test run instead of only when TCP loopback bind privileges are available.

All 24 listener tests run unconditionally; trx-server suite reports 142 passed, 0 ignored (was 134 passed, 8 ignored).

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
2026-05-03 21:05:31 +02:00
parent cc001287a2
commit bdf63fe81c
+130 -268
View File
@@ -234,8 +234,8 @@ async fn read_limited_line<R: AsyncBufRead + Unpin>(
}
}
async fn send_response(
writer: &mut tokio::net::tcp::OwnedWriteHalf,
async fn send_response<W: tokio::io::AsyncWrite + Unpin>(
writer: &mut W,
response: &ClientResponse,
io_timeout: Duration,
) -> std::io::Result<()> {
@@ -257,8 +257,31 @@ async fn handle_client(
socket: TcpStream,
addr: SocketAddr,
ctx: ClientContext,
mut shutdown_rx: watch::Receiver<bool>,
shutdown_rx: watch::Receiver<bool>,
) -> std::io::Result<()> {
// Disable Nagle so small frames (command responses, meter samples) ship
// immediately instead of sitting in the kernel's send buffer for up to
// ~40 ms waiting for more payload.
let _ = socket.set_nodelay(true);
let (reader, writer) = socket.into_split();
handle_client_io(BufReader::new(reader), writer, addr, ctx, shutdown_rx).await
}
/// Generic per-client request loop. Splitting this from `handle_client`
/// lets tests exercise the full protocol over a `tokio::io::duplex` pair
/// instead of needing a real TCP socket — the production wrapper just feeds
/// in the split halves of an accepted `TcpStream`.
async fn handle_client_io<R, W>(
mut reader: BufReader<R>,
mut writer: W,
addr: SocketAddr,
ctx: ClientContext,
mut shutdown_rx: watch::Receiver<bool>,
) -> std::io::Result<()>
where
R: tokio::io::AsyncRead + Unpin,
W: tokio::io::AsyncWrite + Unpin,
{
let ClientContext {
rigs,
default_rig_id,
@@ -267,12 +290,6 @@ async fn handle_client(
sat_pass_cache,
timeouts,
} = ctx;
// Disable Nagle so small frames (command responses, meter samples) ship
// immediately instead of sitting in the kernel's send buffer for up to
// ~40 ms waiting for more payload.
let _ = socket.set_nodelay(true);
let (reader, mut writer) = socket.into_split();
let mut reader = BufReader::new(reader);
loop {
let line = tokio::select! {
@@ -664,10 +681,9 @@ async fn handle_client(
mod tests {
use super::*;
use std::collections::HashSet;
use std::net::{Ipv4Addr, SocketAddr};
use std::net::SocketAddr;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, watch};
use trx_core::radio::freq::Band;
@@ -675,13 +691,6 @@ mod tests {
use trx_core::rig::state::RigState;
use trx_core::rig::{RigAccessMethod, RigCapabilities, RigInfo};
fn loopback_addr() -> SocketAddr {
let listener = std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).expect("bind");
let addr = listener.local_addr().expect("local_addr");
drop(listener);
addr
}
fn sample_state() -> RigState {
let mut state = RigState::new_uninitialized();
state.initialized = true;
@@ -736,121 +745,115 @@ mod tests {
(Arc::new(map), "default".to_string())
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn listener_rejects_missing_token() {
let addr = loopback_addr();
let (rigs, default_id) = make_rigs(sample_state());
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let mut auth = HashSet::new();
auth.insert("secret".to_string());
let handle = tokio::spawn(run_listener(
addr,
/// Build a ClientContext directly so tests can drive `handle_client_io`
/// without binding a real TCP listener.
fn make_ctx(
rigs: Arc<HashMap<String, RigHandle>>,
default_rig_id: String,
auth_tokens: HashSet<String>,
) -> ClientContext {
ClientContext {
rigs,
default_id,
auth,
None,
ListenerTimeouts::default(),
default_rig_id,
validator: Arc::new(SimpleTokenValidator::new(auth_tokens)),
station_coords: None,
sat_pass_cache: Arc::new(Mutex::new(None)),
timeouts: ListenerTimeouts::default(),
}
}
type ClientReader = BufReader<tokio::io::ReadHalf<tokio::io::DuplexStream>>;
type ClientWriter = tokio::io::WriteHalf<tokio::io::DuplexStream>;
type SpawnedClient = (
ClientReader,
ClientWriter,
tokio::task::JoinHandle<std::io::Result<()>>,
watch::Sender<bool>,
);
/// Spawn `handle_client_io` over a `tokio::io::duplex` pair. Returns the
/// client-side read+write halves, the spawned task handle, and a
/// shutdown sender that ends the loop when fired.
fn spawn_client_io(ctx: ClientContext) -> SpawnedClient {
let (server_io, client_io) = tokio::io::duplex(64 * 1024);
let (server_read, server_write) = tokio::io::split(server_io);
let (client_read, client_write) = tokio::io::split(client_io);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let addr: SocketAddr = "127.0.0.1:1234".parse().unwrap();
let handle = tokio::spawn(handle_client_io(
BufReader::new(server_read),
server_write,
addr,
ctx,
shutdown_rx,
));
(
BufReader::new(client_read),
client_write,
handle,
shutdown_tx,
)
}
let stream = TcpStream::connect(addr).await.expect("connect");
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
writer
.write_all(br#"{"cmd":"get_state"}"#)
.await
.expect("write");
/// Send one JSON line over the duplex and read one response line back.
async fn duplex_round_trip<W: AsyncWriteExt + Unpin>(
writer: &mut W,
reader: &mut ClientReader,
json: &[u8],
) -> ClientResponse {
writer.write_all(json).await.expect("write");
writer.write_all(b"\n").await.expect("newline");
writer.flush().await.expect("flush");
let mut line = String::new();
reader.read_line(&mut line).await.expect("read");
let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json");
serde_json::from_str(line.trim_end()).expect("response json")
}
#[tokio::test]
async fn listener_rejects_missing_token() {
let (rigs, default_id) = make_rigs(sample_state());
let mut auth = HashSet::new();
auth.insert("secret".to_string());
let ctx = make_ctx(rigs, default_id, auth);
let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx);
let resp = duplex_round_trip(&mut writer, &mut reader, br#"{"cmd":"get_state"}"#).await;
assert!(!resp.success);
assert_eq!(resp.error.as_deref(), Some("missing authorization token"));
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn listener_serves_get_state_snapshot() {
let addr = loopback_addr();
let (rigs, default_id) = make_rigs(sample_state());
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let ctx = make_ctx(rigs, default_id, HashSet::new());
let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
let stream = TcpStream::connect(addr).await.expect("connect");
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
writer
.write_all(br#"{"cmd":"get_state"}"#)
.await
.expect("write");
writer.write_all(b"\n").await.expect("newline");
writer.flush().await.expect("flush");
let mut line = String::new();
reader.read_line(&mut line).await.expect("read");
let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json");
let resp = duplex_round_trip(&mut writer, &mut reader, br#"{"cmd":"get_state"}"#).await;
assert!(resp.success);
let snapshot = resp.state.expect("snapshot");
assert_eq!(snapshot.info.model, "Dummy");
assert_eq!(snapshot.status.freq.hz, 144_300_000);
// rig_id should be set in the response
assert_eq!(resp.rig_id.as_deref(), Some("default"));
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn listener_routes_unknown_rig_id() {
let addr = loopback_addr();
let (rigs, default_id) = make_rigs(sample_state());
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let ctx = make_ctx(rigs, default_id, HashSet::new());
let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
let stream = TcpStream::connect(addr).await.expect("connect");
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
writer
.write_all(br#"{"rig_id":"nonexistent","cmd":"get_state"}"#)
.await
.expect("write");
writer.write_all(b"\n").await.expect("newline");
writer.flush().await.expect("flush");
let mut line = String::new();
reader.read_line(&mut line).await.expect("read");
let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json");
let resp = duplex_round_trip(
&mut writer,
&mut reader,
br#"{"rig_id":"nonexistent","cmd":"get_state"}"#,
)
.await;
assert!(!resp.success);
assert!(resp
.error
@@ -859,7 +862,6 @@ mod tests {
.contains("Unknown rig_id"));
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
@@ -951,161 +953,76 @@ mod tests {
(Arc::new(map), "rig_hf".to_string(), rx_a, rx_b)
}
/// Helper: send a JSON line and read one response line from the stream.
async fn send_and_recv(
writer: &mut tokio::net::tcp::OwnedWriteHalf,
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
json: &[u8],
) -> ClientResponse {
writer.write_all(json).await.expect("write");
writer.write_all(b"\n").await.expect("newline");
writer.flush().await.expect("flush");
let mut line = String::new();
reader.read_line(&mut line).await.expect("read");
serde_json::from_str(line.trim_end()).expect("response json")
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn multi_rig_state_isolation() {
// Two rigs with different frequencies and modes.
let state_hf = sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB);
let state_vhf = sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM);
let (rigs, default_id, _rx_a, _rx_b) = make_two_rigs(state_hf, state_vhf);
let addr = loopback_addr();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let ctx = make_ctx(rigs, default_id, HashSet::new());
let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
// Allow listener to bind.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = TcpStream::connect(addr).await.expect("connect");
let (read_half, mut writer) = stream.into_split();
let mut reader = BufReader::new(read_half);
// Query rig_hf — should return HF state.
let resp = send_and_recv(
let resp = duplex_round_trip(
&mut writer,
&mut reader,
br#"{"rig_id":"rig_hf","cmd":"get_state"}"#,
)
.await;
assert!(resp.success, "rig_hf get_state should succeed");
assert!(resp.success);
assert_eq!(resp.rig_id.as_deref(), Some("rig_hf"));
let snap_hf = resp.state.expect("rig_hf snapshot");
assert_eq!(snap_hf.info.model, "HF-Dummy");
assert_eq!(snap_hf.status.freq.hz, 14_200_000);
// Query rig_vhf — should return VHF state.
let resp = send_and_recv(
let resp = duplex_round_trip(
&mut writer,
&mut reader,
br#"{"rig_id":"rig_vhf","cmd":"get_state"}"#,
)
.await;
assert!(resp.success, "rig_vhf get_state should succeed");
assert!(resp.success);
assert_eq!(resp.rig_id.as_deref(), Some("rig_vhf"));
let snap_vhf = resp.state.expect("rig_vhf snapshot");
assert_eq!(snap_vhf.info.model, "VHF-Dummy");
assert_eq!(snap_vhf.status.freq.hz, 145_500_000);
// Verify the two snapshots have different modes.
assert_ne!(
snap_hf.status.mode, snap_vhf.status.mode,
"Rig states should be independent"
);
assert_ne!(snap_hf.status.mode, snap_vhf.status.mode);
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn multi_rig_default_fallback() {
// When rig_id is omitted, the default rig (rig_hf) should be used.
let state_hf = sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB);
let state_vhf = sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM);
let (rigs, default_id, _rx_a, _rx_b) = make_two_rigs(state_hf, state_vhf);
let addr = loopback_addr();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let ctx = make_ctx(rigs, default_id, HashSet::new());
let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = TcpStream::connect(addr).await.expect("connect");
let (read_half, mut writer) = stream.into_split();
let mut reader = BufReader::new(read_half);
// No rig_id — should resolve to default (rig_hf).
let resp = send_and_recv(&mut writer, &mut reader, br#"{"cmd":"get_state"}"#).await;
assert!(resp.success, "default get_state should succeed");
let resp = duplex_round_trip(&mut writer, &mut reader, br#"{"cmd":"get_state"}"#).await;
assert!(resp.success);
assert_eq!(resp.rig_id.as_deref(), Some("rig_hf"));
let snap = resp.state.expect("default snapshot");
assert_eq!(snap.info.model, "HF-Dummy");
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn multi_rig_get_rigs_returns_all() {
let state_hf = sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB);
let state_vhf = sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM);
let (rigs, default_id, _rx_a, _rx_b) = make_two_rigs(state_hf, state_vhf);
let addr = loopback_addr();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let ctx = make_ctx(rigs, default_id, HashSet::new());
let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = TcpStream::connect(addr).await.expect("connect");
let (read_half, mut writer) = stream.into_split();
let mut reader = BufReader::new(read_half);
let resp = send_and_recv(&mut writer, &mut reader, br#"{"cmd":"get_rigs"}"#).await;
assert!(resp.success, "get_rigs should succeed");
let resp = duplex_round_trip(&mut writer, &mut reader, br#"{"cmd":"get_rigs"}"#).await;
assert!(resp.success);
let entries = resp.rigs.expect("rigs list");
assert_eq!(entries.len(), 2, "should return both rigs");
// Collect rig_ids from the entries.
assert_eq!(entries.len(), 2);
let ids: HashSet<String> = entries.iter().map(|e| e.rig_id.clone()).collect();
assert!(ids.contains("rig_hf"), "should contain rig_hf");
assert!(ids.contains("rig_vhf"), "should contain rig_vhf");
// Verify each entry has the correct frequency.
assert!(ids.contains("rig_hf"));
assert!(ids.contains("rig_vhf"));
for entry in &entries {
match entry.rig_id.as_str() {
"rig_hf" => {
@@ -1118,44 +1035,22 @@ mod tests {
assert_eq!(entry.state.info.model, "VHF-Dummy");
assert_eq!(entry.audio_port, Some(4532));
}
other => panic!("Unexpected rig_id: {}", other),
other => panic!("unexpected rig_id: {}", other),
}
}
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn multi_rig_command_routing() {
// Verify that a set_freq command targeting rig_vhf is delivered to the
// VHF rig's mpsc channel and not to the HF rig's channel.
let state_hf = sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB);
let state_vhf = sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM);
let (rigs, default_id, mut rx_hf, mut rx_vhf) = make_two_rigs(state_hf, state_vhf);
let addr = loopback_addr();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let ctx = make_ctx(rigs, default_id, HashSet::new());
let (_reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = TcpStream::connect(addr).await.expect("connect");
let (_read_half, mut writer) = stream.into_split();
// Send set_freq targeting rig_vhf. The listener will forward the
// command to the VHF rig's mpsc channel.
writer
.write_all(br#"{"rig_id":"rig_vhf","cmd":"set_freq","freq_hz":146000000}"#)
.await
@@ -1163,8 +1058,7 @@ mod tests {
writer.write_all(b"\n").await.expect("newline");
writer.flush().await.expect("flush");
// The VHF channel should receive the command.
let req = tokio::time::timeout(std::time::Duration::from_secs(2), rx_vhf.recv())
let req = tokio::time::timeout(Duration::from_secs(2), rx_vhf.recv())
.await
.expect("timeout waiting for VHF command")
.expect("VHF channel closed");
@@ -1173,45 +1067,20 @@ mod tests {
"VHF rig should receive SetFreq(146 MHz), got {:?}",
req.cmd
);
// The HF channel should NOT have received anything.
assert!(
rx_hf.try_recv().is_err(),
"HF rig should not receive commands targeting VHF"
);
assert!(rx_hf.try_recv().is_err());
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}
#[tokio::test]
#[ignore = "requires TCP bind permissions"]
async fn multi_rig_command_routing_to_default() {
// When rig_id is omitted, commands should go to the default rig (HF).
let state_hf = sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB);
let state_vhf = sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM);
let (rigs, default_id, mut rx_hf, mut rx_vhf) = make_two_rigs(state_hf, state_vhf);
let addr = loopback_addr();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let ctx = make_ctx(rigs, default_id, HashSet::new());
let (_reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx);
let handle = tokio::spawn(run_listener(
addr,
rigs,
default_id,
HashSet::new(),
None,
ListenerTimeouts::default(),
shutdown_rx,
));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = TcpStream::connect(addr).await.expect("connect");
let (_read_half, mut writer) = stream.into_split();
// No rig_id — should route to default (rig_hf).
writer
.write_all(br#"{"cmd":"set_freq","freq_hz":7100000}"#)
.await
@@ -1219,8 +1088,7 @@ mod tests {
writer.write_all(b"\n").await.expect("newline");
writer.flush().await.expect("flush");
// The HF channel should receive the command.
let req = tokio::time::timeout(std::time::Duration::from_secs(2), rx_hf.recv())
let req = tokio::time::timeout(Duration::from_secs(2), rx_hf.recv())
.await
.expect("timeout waiting for HF command")
.expect("HF channel closed");
@@ -1229,15 +1097,9 @@ mod tests {
"HF rig should receive SetFreq(7.1 MHz), got {:?}",
req.cmd
);
// VHF should not receive anything.
assert!(
rx_vhf.try_recv().is_err(),
"VHF rig should not receive commands with no rig_id"
);
assert!(rx_vhf.try_recv().is_err());
let _ = shutdown_tx.send(true);
handle.abort();
let _ = handle.await;
}