From bdf63fe81c8ef96b41d563727c88875421a898b2 Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Sun, 3 May 2026 21:05:31 +0200 Subject: [PATCH] [refactor](trx-server): de-ignore listener tests via duplex transport Split handle_client into a generic handle_client_io core plus a thin TcpStream wrapper, and make send_response generic over AsyncWrite. The 8 ignored integration tests now drive handle_client_io directly over a tokio::io::duplex pair, so the per-connection protocol state machine is exercised on every cargo test run instead of only when TCP loopback bind privileges are available. All 24 listener tests run unconditionally; trx-server suite reports 142 passed, 0 ignored (was 134 passed, 8 ignored). Co-authored-by: Claude Opus 4.7 Signed-off-by: Stan Grams --- src/trx-server/src/listener.rs | 398 +++++++++++---------------------- 1 file changed, 130 insertions(+), 268 deletions(-) diff --git a/src/trx-server/src/listener.rs b/src/trx-server/src/listener.rs index cdc241e..72ae039 100644 --- a/src/trx-server/src/listener.rs +++ b/src/trx-server/src/listener.rs @@ -234,8 +234,8 @@ async fn read_limited_line( } } -async fn send_response( - writer: &mut tokio::net::tcp::OwnedWriteHalf, +async fn send_response( + writer: &mut W, response: &ClientResponse, io_timeout: Duration, ) -> std::io::Result<()> { @@ -257,8 +257,31 @@ async fn handle_client( socket: TcpStream, addr: SocketAddr, ctx: ClientContext, - mut shutdown_rx: watch::Receiver, + shutdown_rx: watch::Receiver, ) -> std::io::Result<()> { + // Disable Nagle so small frames (command responses, meter samples) ship + // immediately instead of sitting in the kernel's send buffer for up to + // ~40 ms waiting for more payload. + let _ = socket.set_nodelay(true); + let (reader, writer) = socket.into_split(); + handle_client_io(BufReader::new(reader), writer, addr, ctx, shutdown_rx).await +} + +/// Generic per-client request loop. Splitting this from `handle_client` +/// lets tests exercise the full protocol over a `tokio::io::duplex` pair +/// instead of needing a real TCP socket — the production wrapper just feeds +/// in the split halves of an accepted `TcpStream`. +async fn handle_client_io( + mut reader: BufReader, + mut writer: W, + addr: SocketAddr, + ctx: ClientContext, + mut shutdown_rx: watch::Receiver, +) -> std::io::Result<()> +where + R: tokio::io::AsyncRead + Unpin, + W: tokio::io::AsyncWrite + Unpin, +{ let ClientContext { rigs, default_rig_id, @@ -267,12 +290,6 @@ async fn handle_client( sat_pass_cache, timeouts, } = ctx; - // Disable Nagle so small frames (command responses, meter samples) ship - // immediately instead of sitting in the kernel's send buffer for up to - // ~40 ms waiting for more payload. - let _ = socket.set_nodelay(true); - let (reader, mut writer) = socket.into_split(); - let mut reader = BufReader::new(reader); loop { let line = tokio::select! { @@ -664,10 +681,9 @@ async fn handle_client( mod tests { use super::*; use std::collections::HashSet; - use std::net::{Ipv4Addr, SocketAddr}; + use std::net::SocketAddr; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; - use tokio::net::TcpStream; use tokio::sync::{mpsc, watch}; use trx_core::radio::freq::Band; @@ -675,13 +691,6 @@ mod tests { use trx_core::rig::state::RigState; use trx_core::rig::{RigAccessMethod, RigCapabilities, RigInfo}; - fn loopback_addr() -> SocketAddr { - let listener = std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).expect("bind"); - let addr = listener.local_addr().expect("local_addr"); - drop(listener); - addr - } - fn sample_state() -> RigState { let mut state = RigState::new_uninitialized(); state.initialized = true; @@ -736,121 +745,115 @@ mod tests { (Arc::new(map), "default".to_string()) } - #[tokio::test] - #[ignore = "requires TCP bind permissions"] - async fn listener_rejects_missing_token() { - let addr = loopback_addr(); - let (rigs, default_id) = make_rigs(sample_state()); - let (shutdown_tx, shutdown_rx) = watch::channel(false); - - let mut auth = HashSet::new(); - auth.insert("secret".to_string()); - let handle = tokio::spawn(run_listener( - addr, + /// Build a ClientContext directly so tests can drive `handle_client_io` + /// without binding a real TCP listener. + fn make_ctx( + rigs: Arc>, + default_rig_id: String, + auth_tokens: HashSet, + ) -> ClientContext { + ClientContext { rigs, - default_id, - auth, - None, - ListenerTimeouts::default(), + default_rig_id, + validator: Arc::new(SimpleTokenValidator::new(auth_tokens)), + station_coords: None, + sat_pass_cache: Arc::new(Mutex::new(None)), + timeouts: ListenerTimeouts::default(), + } + } + + type ClientReader = BufReader>; + type ClientWriter = tokio::io::WriteHalf; + type SpawnedClient = ( + ClientReader, + ClientWriter, + tokio::task::JoinHandle>, + watch::Sender, + ); + + /// Spawn `handle_client_io` over a `tokio::io::duplex` pair. Returns the + /// client-side read+write halves, the spawned task handle, and a + /// shutdown sender that ends the loop when fired. + fn spawn_client_io(ctx: ClientContext) -> SpawnedClient { + let (server_io, client_io) = tokio::io::duplex(64 * 1024); + let (server_read, server_write) = tokio::io::split(server_io); + let (client_read, client_write) = tokio::io::split(client_io); + let (shutdown_tx, shutdown_rx) = watch::channel(false); + let addr: SocketAddr = "127.0.0.1:1234".parse().unwrap(); + let handle = tokio::spawn(handle_client_io( + BufReader::new(server_read), + server_write, + addr, + ctx, shutdown_rx, )); + ( + BufReader::new(client_read), + client_write, + handle, + shutdown_tx, + ) + } - let stream = TcpStream::connect(addr).await.expect("connect"); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - writer - .write_all(br#"{"cmd":"get_state"}"#) - .await - .expect("write"); + /// Send one JSON line over the duplex and read one response line back. + async fn duplex_round_trip( + writer: &mut W, + reader: &mut ClientReader, + json: &[u8], + ) -> ClientResponse { + writer.write_all(json).await.expect("write"); writer.write_all(b"\n").await.expect("newline"); writer.flush().await.expect("flush"); - let mut line = String::new(); reader.read_line(&mut line).await.expect("read"); - let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json"); + serde_json::from_str(line.trim_end()).expect("response json") + } + + #[tokio::test] + async fn listener_rejects_missing_token() { + let (rigs, default_id) = make_rigs(sample_state()); + let mut auth = HashSet::new(); + auth.insert("secret".to_string()); + let ctx = make_ctx(rigs, default_id, auth); + let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx); + + let resp = duplex_round_trip(&mut writer, &mut reader, br#"{"cmd":"get_state"}"#).await; assert!(!resp.success); assert_eq!(resp.error.as_deref(), Some("missing authorization token")); let _ = shutdown_tx.send(true); - handle.abort(); let _ = handle.await; } #[tokio::test] - #[ignore = "requires TCP bind permissions"] async fn listener_serves_get_state_snapshot() { - let addr = loopback_addr(); let (rigs, default_id) = make_rigs(sample_state()); - let (shutdown_tx, shutdown_rx) = watch::channel(false); + let ctx = make_ctx(rigs, default_id, HashSet::new()); + let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx); - let handle = tokio::spawn(run_listener( - addr, - rigs, - default_id, - HashSet::new(), - None, - ListenerTimeouts::default(), - shutdown_rx, - )); - - let stream = TcpStream::connect(addr).await.expect("connect"); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - writer - .write_all(br#"{"cmd":"get_state"}"#) - .await - .expect("write"); - writer.write_all(b"\n").await.expect("newline"); - writer.flush().await.expect("flush"); - - let mut line = String::new(); - reader.read_line(&mut line).await.expect("read"); - let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json"); + let resp = duplex_round_trip(&mut writer, &mut reader, br#"{"cmd":"get_state"}"#).await; assert!(resp.success); let snapshot = resp.state.expect("snapshot"); assert_eq!(snapshot.info.model, "Dummy"); assert_eq!(snapshot.status.freq.hz, 144_300_000); - // rig_id should be set in the response assert_eq!(resp.rig_id.as_deref(), Some("default")); let _ = shutdown_tx.send(true); - handle.abort(); let _ = handle.await; } #[tokio::test] - #[ignore = "requires TCP bind permissions"] async fn listener_routes_unknown_rig_id() { - let addr = loopback_addr(); let (rigs, default_id) = make_rigs(sample_state()); - let (shutdown_tx, shutdown_rx) = watch::channel(false); + let ctx = make_ctx(rigs, default_id, HashSet::new()); + let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx); - let handle = tokio::spawn(run_listener( - addr, - rigs, - default_id, - HashSet::new(), - None, - ListenerTimeouts::default(), - shutdown_rx, - )); - - let stream = TcpStream::connect(addr).await.expect("connect"); - let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); - - writer - .write_all(br#"{"rig_id":"nonexistent","cmd":"get_state"}"#) - .await - .expect("write"); - writer.write_all(b"\n").await.expect("newline"); - writer.flush().await.expect("flush"); - - let mut line = String::new(); - reader.read_line(&mut line).await.expect("read"); - let resp: ClientResponse = serde_json::from_str(line.trim_end()).expect("response json"); + let resp = duplex_round_trip( + &mut writer, + &mut reader, + br#"{"rig_id":"nonexistent","cmd":"get_state"}"#, + ) + .await; assert!(!resp.success); assert!(resp .error @@ -859,7 +862,6 @@ mod tests { .contains("Unknown rig_id")); let _ = shutdown_tx.send(true); - handle.abort(); let _ = handle.await; } @@ -951,161 +953,76 @@ mod tests { (Arc::new(map), "rig_hf".to_string(), rx_a, rx_b) } - /// Helper: send a JSON line and read one response line from the stream. - async fn send_and_recv( - writer: &mut tokio::net::tcp::OwnedWriteHalf, - reader: &mut BufReader, - json: &[u8], - ) -> ClientResponse { - writer.write_all(json).await.expect("write"); - writer.write_all(b"\n").await.expect("newline"); - writer.flush().await.expect("flush"); - let mut line = String::new(); - reader.read_line(&mut line).await.expect("read"); - serde_json::from_str(line.trim_end()).expect("response json") - } - #[tokio::test] - #[ignore = "requires TCP bind permissions"] async fn multi_rig_state_isolation() { - // Two rigs with different frequencies and modes. let state_hf = sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB); let state_vhf = sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM); - let (rigs, default_id, _rx_a, _rx_b) = make_two_rigs(state_hf, state_vhf); - let addr = loopback_addr(); - let (shutdown_tx, shutdown_rx) = watch::channel(false); + let ctx = make_ctx(rigs, default_id, HashSet::new()); + let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx); - let handle = tokio::spawn(run_listener( - addr, - rigs, - default_id, - HashSet::new(), - None, - ListenerTimeouts::default(), - shutdown_rx, - )); - - // Allow listener to bind. - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - let stream = TcpStream::connect(addr).await.expect("connect"); - let (read_half, mut writer) = stream.into_split(); - let mut reader = BufReader::new(read_half); - - // Query rig_hf — should return HF state. - let resp = send_and_recv( + let resp = duplex_round_trip( &mut writer, &mut reader, br#"{"rig_id":"rig_hf","cmd":"get_state"}"#, ) .await; - assert!(resp.success, "rig_hf get_state should succeed"); + assert!(resp.success); assert_eq!(resp.rig_id.as_deref(), Some("rig_hf")); let snap_hf = resp.state.expect("rig_hf snapshot"); assert_eq!(snap_hf.info.model, "HF-Dummy"); assert_eq!(snap_hf.status.freq.hz, 14_200_000); - // Query rig_vhf — should return VHF state. - let resp = send_and_recv( + let resp = duplex_round_trip( &mut writer, &mut reader, br#"{"rig_id":"rig_vhf","cmd":"get_state"}"#, ) .await; - assert!(resp.success, "rig_vhf get_state should succeed"); + assert!(resp.success); assert_eq!(resp.rig_id.as_deref(), Some("rig_vhf")); let snap_vhf = resp.state.expect("rig_vhf snapshot"); assert_eq!(snap_vhf.info.model, "VHF-Dummy"); assert_eq!(snap_vhf.status.freq.hz, 145_500_000); - - // Verify the two snapshots have different modes. - assert_ne!( - snap_hf.status.mode, snap_vhf.status.mode, - "Rig states should be independent" - ); + assert_ne!(snap_hf.status.mode, snap_vhf.status.mode); let _ = shutdown_tx.send(true); - handle.abort(); let _ = handle.await; } #[tokio::test] - #[ignore = "requires TCP bind permissions"] async fn multi_rig_default_fallback() { - // When rig_id is omitted, the default rig (rig_hf) should be used. let state_hf = sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB); let state_vhf = sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM); - let (rigs, default_id, _rx_a, _rx_b) = make_two_rigs(state_hf, state_vhf); - let addr = loopback_addr(); - let (shutdown_tx, shutdown_rx) = watch::channel(false); + let ctx = make_ctx(rigs, default_id, HashSet::new()); + let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx); - let handle = tokio::spawn(run_listener( - addr, - rigs, - default_id, - HashSet::new(), - None, - ListenerTimeouts::default(), - shutdown_rx, - )); - - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - let stream = TcpStream::connect(addr).await.expect("connect"); - let (read_half, mut writer) = stream.into_split(); - let mut reader = BufReader::new(read_half); - - // No rig_id — should resolve to default (rig_hf). - let resp = send_and_recv(&mut writer, &mut reader, br#"{"cmd":"get_state"}"#).await; - assert!(resp.success, "default get_state should succeed"); + let resp = duplex_round_trip(&mut writer, &mut reader, br#"{"cmd":"get_state"}"#).await; + assert!(resp.success); assert_eq!(resp.rig_id.as_deref(), Some("rig_hf")); let snap = resp.state.expect("default snapshot"); assert_eq!(snap.info.model, "HF-Dummy"); let _ = shutdown_tx.send(true); - handle.abort(); let _ = handle.await; } #[tokio::test] - #[ignore = "requires TCP bind permissions"] async fn multi_rig_get_rigs_returns_all() { let state_hf = sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB); let state_vhf = sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM); - let (rigs, default_id, _rx_a, _rx_b) = make_two_rigs(state_hf, state_vhf); - let addr = loopback_addr(); - let (shutdown_tx, shutdown_rx) = watch::channel(false); + let ctx = make_ctx(rigs, default_id, HashSet::new()); + let (mut reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx); - let handle = tokio::spawn(run_listener( - addr, - rigs, - default_id, - HashSet::new(), - None, - ListenerTimeouts::default(), - shutdown_rx, - )); - - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - let stream = TcpStream::connect(addr).await.expect("connect"); - let (read_half, mut writer) = stream.into_split(); - let mut reader = BufReader::new(read_half); - - let resp = send_and_recv(&mut writer, &mut reader, br#"{"cmd":"get_rigs"}"#).await; - assert!(resp.success, "get_rigs should succeed"); + let resp = duplex_round_trip(&mut writer, &mut reader, br#"{"cmd":"get_rigs"}"#).await; + assert!(resp.success); let entries = resp.rigs.expect("rigs list"); - assert_eq!(entries.len(), 2, "should return both rigs"); - - // Collect rig_ids from the entries. + assert_eq!(entries.len(), 2); let ids: HashSet = entries.iter().map(|e| e.rig_id.clone()).collect(); - assert!(ids.contains("rig_hf"), "should contain rig_hf"); - assert!(ids.contains("rig_vhf"), "should contain rig_vhf"); - - // Verify each entry has the correct frequency. + assert!(ids.contains("rig_hf")); + assert!(ids.contains("rig_vhf")); for entry in &entries { match entry.rig_id.as_str() { "rig_hf" => { @@ -1118,44 +1035,22 @@ mod tests { assert_eq!(entry.state.info.model, "VHF-Dummy"); assert_eq!(entry.audio_port, Some(4532)); } - other => panic!("Unexpected rig_id: {}", other), + other => panic!("unexpected rig_id: {}", other), } } let _ = shutdown_tx.send(true); - handle.abort(); let _ = handle.await; } #[tokio::test] - #[ignore = "requires TCP bind permissions"] async fn multi_rig_command_routing() { - // Verify that a set_freq command targeting rig_vhf is delivered to the - // VHF rig's mpsc channel and not to the HF rig's channel. let state_hf = sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB); let state_vhf = sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM); - let (rigs, default_id, mut rx_hf, mut rx_vhf) = make_two_rigs(state_hf, state_vhf); - let addr = loopback_addr(); - let (shutdown_tx, shutdown_rx) = watch::channel(false); + let ctx = make_ctx(rigs, default_id, HashSet::new()); + let (_reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx); - let handle = tokio::spawn(run_listener( - addr, - rigs, - default_id, - HashSet::new(), - None, - ListenerTimeouts::default(), - shutdown_rx, - )); - - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - let stream = TcpStream::connect(addr).await.expect("connect"); - let (_read_half, mut writer) = stream.into_split(); - - // Send set_freq targeting rig_vhf. The listener will forward the - // command to the VHF rig's mpsc channel. writer .write_all(br#"{"rig_id":"rig_vhf","cmd":"set_freq","freq_hz":146000000}"#) .await @@ -1163,8 +1058,7 @@ mod tests { writer.write_all(b"\n").await.expect("newline"); writer.flush().await.expect("flush"); - // The VHF channel should receive the command. - let req = tokio::time::timeout(std::time::Duration::from_secs(2), rx_vhf.recv()) + let req = tokio::time::timeout(Duration::from_secs(2), rx_vhf.recv()) .await .expect("timeout waiting for VHF command") .expect("VHF channel closed"); @@ -1173,45 +1067,20 @@ mod tests { "VHF rig should receive SetFreq(146 MHz), got {:?}", req.cmd ); - - // The HF channel should NOT have received anything. - assert!( - rx_hf.try_recv().is_err(), - "HF rig should not receive commands targeting VHF" - ); + assert!(rx_hf.try_recv().is_err()); let _ = shutdown_tx.send(true); - handle.abort(); let _ = handle.await; } #[tokio::test] - #[ignore = "requires TCP bind permissions"] async fn multi_rig_command_routing_to_default() { - // When rig_id is omitted, commands should go to the default rig (HF). let state_hf = sample_state_custom("HF-Dummy", 14_200_000, trx_core::RigMode::USB); let state_vhf = sample_state_custom("VHF-Dummy", 145_500_000, trx_core::RigMode::FM); - let (rigs, default_id, mut rx_hf, mut rx_vhf) = make_two_rigs(state_hf, state_vhf); - let addr = loopback_addr(); - let (shutdown_tx, shutdown_rx) = watch::channel(false); + let ctx = make_ctx(rigs, default_id, HashSet::new()); + let (_reader, mut writer, handle, shutdown_tx) = spawn_client_io(ctx); - let handle = tokio::spawn(run_listener( - addr, - rigs, - default_id, - HashSet::new(), - None, - ListenerTimeouts::default(), - shutdown_rx, - )); - - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - let stream = TcpStream::connect(addr).await.expect("connect"); - let (_read_half, mut writer) = stream.into_split(); - - // No rig_id — should route to default (rig_hf). writer .write_all(br#"{"cmd":"set_freq","freq_hz":7100000}"#) .await @@ -1219,8 +1088,7 @@ mod tests { writer.write_all(b"\n").await.expect("newline"); writer.flush().await.expect("flush"); - // The HF channel should receive the command. - let req = tokio::time::timeout(std::time::Duration::from_secs(2), rx_hf.recv()) + let req = tokio::time::timeout(Duration::from_secs(2), rx_hf.recv()) .await .expect("timeout waiting for HF command") .expect("HF channel closed"); @@ -1229,15 +1097,9 @@ mod tests { "HF rig should receive SetFreq(7.1 MHz), got {:?}", req.cmd ); - - // VHF should not receive anything. - assert!( - rx_vhf.try_recv().is_err(), - "VHF rig should not receive commands with no rig_id" - ); + assert!(rx_vhf.try_recv().is_err()); let _ = shutdown_tx.send(true); - handle.abort(); let _ = handle.await; }