diff --git a/src/trx-server/src/listener.rs b/src/trx-server/src/listener.rs index a9509cc..cdc241e 100644 --- a/src/trx-server/src/listener.rs +++ b/src/trx-server/src/listener.rs @@ -1240,4 +1240,169 @@ mod tests { handle.abort(); let _ = handle.await; } + + // ------------------------------------------------------------------- + // read_limited_line + // ------------------------------------------------------------------- + + #[tokio::test] + async fn read_limited_line_returns_none_on_empty_eof() { + let data: &[u8] = b""; + let mut r = BufReader::new(data); + let result = read_limited_line(&mut r, 1024).await.unwrap(); + assert!(result.is_none()); + } + + #[tokio::test] + async fn read_limited_line_returns_line_with_trailing_newline() { + let data: &[u8] = b"hello\n"; + let mut r = BufReader::new(data); + let line = read_limited_line(&mut r, 1024).await.unwrap(); + assert_eq!(line.as_deref(), Some("hello\n")); + } + + #[tokio::test] + async fn read_limited_line_returns_unterminated_line_at_eof() { + let data: &[u8] = b"partial"; + let mut r = BufReader::new(data); + let line = read_limited_line(&mut r, 1024).await.unwrap(); + assert_eq!(line.as_deref(), Some("partial")); + } + + #[tokio::test] + async fn read_limited_line_yields_lines_in_sequence() { + let data: &[u8] = b"a\nb\nc\n"; + let mut r = BufReader::new(data); + assert_eq!( + read_limited_line(&mut r, 1024).await.unwrap().as_deref(), + Some("a\n") + ); + assert_eq!( + read_limited_line(&mut r, 1024).await.unwrap().as_deref(), + Some("b\n") + ); + assert_eq!( + read_limited_line(&mut r, 1024).await.unwrap().as_deref(), + Some("c\n") + ); + assert!(read_limited_line(&mut r, 1024).await.unwrap().is_none()); + } + + #[tokio::test] + async fn read_limited_line_empty_line_is_just_newline() { + let data: &[u8] = b"\n"; + let mut r = BufReader::new(data); + let line = read_limited_line(&mut r, 1024).await.unwrap(); + assert_eq!(line.as_deref(), Some("\n")); + } + + #[tokio::test] + async fn read_limited_line_at_max_size_succeeds() { + // 99 bytes payload + newline = 100 bytes total, equal to max. + let mut data = vec![b'x'; 99]; + data.push(b'\n'); + let mut r = BufReader::new(&data[..]); + let line = read_limited_line(&mut r, 100).await.unwrap(); + assert_eq!(line.map(|s| s.len()), Some(100)); + } + + #[tokio::test] + async fn read_limited_line_rejects_oversize_line_within_chunk() { + // Newline appears inside the buffer but the line including it exceeds + // the cap. + let mut data = vec![b'x'; 5_000]; + data.push(b'\n'); + let mut r = BufReader::new(&data[..]); + let err = read_limited_line(&mut r, 100).await.unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); + } + + #[tokio::test] + async fn read_limited_line_rejects_oversize_line_across_reads() { + // Buffer contains no newline at all and overflows the cap before EOF. + let data = vec![b'x'; 5_000]; + let mut r = BufReader::new(&data[..]); + let err = read_limited_line(&mut r, 100).await.unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); + } + + #[tokio::test] + async fn read_limited_line_rejects_invalid_utf8() { + let data: &[u8] = b"\xff\xfe\n"; + let mut r = BufReader::new(data); + let err = read_limited_line(&mut r, 1024).await.unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); + } + + // ------------------------------------------------------------------- + // ConnectionTracker + // ------------------------------------------------------------------- + + fn ip(s: &str) -> std::net::IpAddr { + s.parse().expect("parse ip") + } + + #[test] + fn connection_tracker_acquires_up_to_limit() { + let mut t = ConnectionTracker::new(); + for _ in 0..MAX_CONNECTIONS_PER_IP { + assert!(t.try_acquire(ip("127.0.0.1"))); + } + } + + #[test] + fn connection_tracker_rejects_at_limit() { + let mut t = ConnectionTracker::new(); + for _ in 0..MAX_CONNECTIONS_PER_IP { + assert!(t.try_acquire(ip("127.0.0.1"))); + } + assert!(!t.try_acquire(ip("127.0.0.1"))); + } + + #[test] + fn connection_tracker_release_frees_a_slot() { + let mut t = ConnectionTracker::new(); + for _ in 0..MAX_CONNECTIONS_PER_IP { + assert!(t.try_acquire(ip("127.0.0.1"))); + } + assert!(!t.try_acquire(ip("127.0.0.1"))); + t.release(ip("127.0.0.1")); + assert!(t.try_acquire(ip("127.0.0.1"))); + } + + #[test] + fn connection_tracker_distinct_ips_are_independent() { + let mut t = ConnectionTracker::new(); + for _ in 0..MAX_CONNECTIONS_PER_IP { + assert!(t.try_acquire(ip("127.0.0.1"))); + } + assert!(!t.try_acquire(ip("127.0.0.1"))); + // Different IP starts fresh. + assert!(t.try_acquire(ip("127.0.0.2"))); + } + + #[test] + fn connection_tracker_release_unknown_ip_is_no_op() { + let mut t = ConnectionTracker::new(); + // Must not panic; must not insert anything. + t.release(ip("192.0.2.1")); + assert!(t.try_acquire(ip("192.0.2.1"))); + } + + #[test] + fn connection_tracker_excessive_release_does_not_underflow() { + let mut t = ConnectionTracker::new(); + assert!(t.try_acquire(ip("127.0.0.1"))); + t.release(ip("127.0.0.1")); + // Already at zero — saturating_sub keeps it there. + t.release(ip("127.0.0.1")); + assert!(t.try_acquire(ip("127.0.0.1"))); + } + + #[test] + fn listener_timeouts_default_uses_module_constants() { + let t = ListenerTimeouts::default(); + assert_eq!(t.io_timeout, DEFAULT_IO_TIMEOUT); + assert_eq!(t.request_timeout, DEFAULT_REQUEST_TIMEOUT); + } }