diff --git a/Cargo.toml b/Cargo.toml index cc3143a..c37cae1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ resolver = "2" [workspace.dependencies] +flate2 = "1" tokio = "1" tokio-serial = "5" serde = "1" diff --git a/src/trx-client/Cargo.toml b/src/trx-client/Cargo.toml index 052d9b2..10c740c 100644 --- a/src/trx-client/Cargo.toml +++ b/src/trx-client/Cargo.toml @@ -8,6 +8,7 @@ version = "0.1.0" edition = "2021" [dependencies] +flate2 = { workspace = true } tokio = { workspace = true, features = ["full"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } @@ -27,4 +28,4 @@ trx-frontend-http-json = { path = "trx-frontend/trx-frontend-http-json" } trx-frontend-rigctl = { path = "trx-frontend/trx-frontend-rigctl" } [features] -default = [] +default = [] \ No newline at end of file diff --git a/src/trx-client/src/audio_client.rs b/src/trx-client/src/audio_client.rs index b14f273..d6d60bf 100644 --- a/src/trx-client/src/audio_client.rs +++ b/src/trx-client/src/audio_client.rs @@ -10,6 +10,8 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use bytes::Bytes; +use flate2::read::GzDecoder; +use std::io::Read as _; use tokio::io::BufReader; use tokio::net::TcpStream; use tokio::sync::{broadcast, mpsc, watch}; @@ -19,8 +21,9 @@ use trx_frontend::RemoteRigEntry; use trx_core::audio::{ read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, - AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_RX_FRAME, - AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, + AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE, + AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, + AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, }; use trx_core::decode::DecodedMessage; @@ -147,6 +150,34 @@ async fn handle_audio_connection( Ok((AUDIO_MSG_RX_FRAME, payload)) => { let _ = rx_tx.send(Bytes::from(payload)); } + Ok((AUDIO_MSG_HISTORY_COMPRESSED, payload)) => { + // Decompress gzip blob, then iterate the embedded framed messages. + let mut decompressed = Vec::new(); + if GzDecoder::new(payload.as_slice()) + .read_to_end(&mut decompressed) + .is_ok() + { + let mut pos = 0; + while pos + 5 <= decompressed.len() { + let _msg_type = decompressed[pos]; + let len = u32::from_be_bytes([ + decompressed[pos + 1], + decompressed[pos + 2], + decompressed[pos + 3], + decompressed[pos + 4], + ]) as usize; + pos += 5; + if pos + len > decompressed.len() { + break; + } + let json = &decompressed[pos..pos + len]; + if let Ok(msg) = serde_json::from_slice::(json) { + let _ = decode_tx.send(msg); + } + pos += len; + } + } + } Ok(( AUDIO_MSG_VDES_DECODE | AUDIO_MSG_AIS_DECODE diff --git a/src/trx-core/Cargo.toml b/src/trx-core/Cargo.toml index eb419a7..8d5d5c2 100644 --- a/src/trx-core/Cargo.toml +++ b/src/trx-core/Cargo.toml @@ -12,3 +12,4 @@ tokio = { workspace = true, features = ["full"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tracing = { workspace = true } +flate2 = { workspace = true } diff --git a/src/trx-core/src/audio.rs b/src/trx-core/src/audio.rs index 20dde47..c591874 100644 --- a/src/trx-core/src/audio.rs +++ b/src/trx-core/src/audio.rs @@ -18,9 +18,15 @@ pub const AUDIO_MSG_WSPR_DECODE: u8 = 0x06; pub const AUDIO_MSG_AIS_DECODE: u8 = 0x07; pub const AUDIO_MSG_VDES_DECODE: u8 = 0x08; pub const AUDIO_MSG_HF_APRS_DECODE: u8 = 0x09; +/// Compressed history blob: payload is a gzip-compressed sequence of normal +/// framed messages (each: `[1 byte type][4 bytes BE length][payload]`). +pub const AUDIO_MSG_HISTORY_COMPRESSED: u8 = 0x0a; -/// Maximum payload size (1 MB) to reject bogus frames early. +/// Maximum payload size for normal messages (1 MB). const MAX_PAYLOAD_SIZE: u32 = 1_048_576; +/// Maximum payload size for the compressed history blob (16 MB). +/// A compressed 24-hour history on a busy channel can reach several MB. +const MAX_HISTORY_PAYLOAD_SIZE: u32 = 16_777_216; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AudioStreamInfo { @@ -59,10 +65,15 @@ pub async fn read_audio_msg( ) -> std::io::Result<(u8, Vec)> { let msg_type = reader.read_u8().await?; let len = reader.read_u32().await?; - if len > MAX_PAYLOAD_SIZE { + let limit = if msg_type == AUDIO_MSG_HISTORY_COMPRESSED { + MAX_HISTORY_PAYLOAD_SIZE + } else { + MAX_PAYLOAD_SIZE + }; + if len > limit { return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, - format!("audio frame too large: {} bytes", len), + format!("audio frame too large: {} bytes (type={:#04x})", len, msg_type), )); } let mut payload = vec![0u8; len as usize]; diff --git a/src/trx-server/Cargo.toml b/src/trx-server/Cargo.toml index 863af41..b9d44d0 100644 --- a/src/trx-server/Cargo.toml +++ b/src/trx-server/Cargo.toml @@ -13,6 +13,7 @@ default = ["soapysdr"] soapysdr = ["trx-backend/soapysdr"] [dependencies] +flate2 = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-serial = { workspace = true } serde = { workspace = true, features = ["derive"] } @@ -37,4 +38,4 @@ trx-cw = { path = "../decoders/trx-cw" } trx-decode-log = { path = "../decoders/trx-decode-log" } trx-ft8 = { path = "../decoders/trx-ft8" } trx-wspr = { path = "../decoders/trx-wspr" } -trx-protocol = { path = "../trx-protocol" } +trx-protocol = { path = "../trx-protocol" } \ No newline at end of file diff --git a/src/trx-server/src/audio.rs b/src/trx-server/src/audio.rs index 8ff9cd3..d7f43b8 100644 --- a/src/trx-server/src/audio.rs +++ b/src/trx-server/src/audio.rs @@ -11,7 +11,10 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use bytes::Bytes; +use flate2::write::GzEncoder; +use flate2::Compression; use num_complex::Complex; +use std::io::Write as _; use tokio::io::AsyncWriteExt; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, mpsc, watch}; @@ -22,8 +25,8 @@ use trx_aprs::AprsDecoder; use trx_core::audio::{ read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, - AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, - AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, + AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, + AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE, }; use trx_core::decode::{ AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage, @@ -1856,15 +1859,27 @@ async fn handle_audio_client( }; let (blob, replayed_history_count) = history_blob; if !blob.is_empty() { - writer.write_all(&blob).await?; - writer.flush().await?; - } - if replayed_history_count > 0 { + // Gzip-compress the blob before sending. JSON history compresses very + // well (~10-20x) so this dramatically reduces both transfer size and + // the time the client spends waiting for data. + let compressed = { + let mut enc = GzEncoder::new( + Vec::with_capacity(blob.len() / 8), + Compression::fast(), + ); + enc.write_all(&blob) + .and_then(|_| enc.finish()) + .unwrap_or(blob.clone()) + }; + write_audio_msg(&mut writer, AUDIO_MSG_HISTORY_COMPRESSED, &compressed).await?; info!( - "Audio client {} replayed {} history messages in {:?}", + "Audio client {} replayed {} history messages in {:?} ({} → {} bytes, {:.1}x)", peer, replayed_history_count, - history_replay_started_at.elapsed() + history_replay_started_at.elapsed(), + blob.len(), + compressed.len(), + blob.len() as f64 / compressed.len().max(1) as f64, ); }