[feat](trx-server): gzip-compress history replay blob
Add flate2 dependency and a new AUDIO_MSG_HISTORY_COMPRESSED (0x0a) wire type. The server gzip-compresses the full history blob before sending; JSON history compresses ~10-20x so both transfer size and client wait time drop significantly. The client decompresses and dispatches sub-messages from the embedded framed stream. MAX_PAYLOAD_SIZE is kept at 1 MB for normal messages; a separate 16 MB limit is applied only to the compressed history type. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -29,6 +29,7 @@ members = [
|
|||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
|
flate2 = "1"
|
||||||
tokio = "1"
|
tokio = "1"
|
||||||
tokio-serial = "5"
|
tokio-serial = "5"
|
||||||
serde = "1"
|
serde = "1"
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ version = "0.1.0"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
flate2 = { workspace = true }
|
||||||
tokio = { workspace = true, features = ["full"] }
|
tokio = { workspace = true, features = ["full"] }
|
||||||
serde = { workspace = true, features = ["derive"] }
|
serde = { workspace = true, features = ["derive"] }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
@@ -27,4 +28,4 @@ trx-frontend-http-json = { path = "trx-frontend/trx-frontend-http-json" }
|
|||||||
trx-frontend-rigctl = { path = "trx-frontend/trx-frontend-rigctl" }
|
trx-frontend-rigctl = { path = "trx-frontend/trx-frontend-rigctl" }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
@@ -10,6 +10,8 @@ use std::sync::{Arc, Mutex};
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use flate2::read::GzDecoder;
|
||||||
|
use std::io::Read as _;
|
||||||
use tokio::io::BufReader;
|
use tokio::io::BufReader;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::sync::{broadcast, mpsc, watch};
|
use tokio::sync::{broadcast, mpsc, watch};
|
||||||
@@ -19,8 +21,9 @@ use trx_frontend::RemoteRigEntry;
|
|||||||
|
|
||||||
use trx_core::audio::{
|
use trx_core::audio::{
|
||||||
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE,
|
read_audio_msg, write_audio_msg, AudioStreamInfo, AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE,
|
||||||
AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_RX_FRAME,
|
AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE, AUDIO_MSG_HF_APRS_DECODE,
|
||||||
AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE,
|
AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME,
|
||||||
|
AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE,
|
||||||
};
|
};
|
||||||
use trx_core::decode::DecodedMessage;
|
use trx_core::decode::DecodedMessage;
|
||||||
|
|
||||||
@@ -147,6 +150,34 @@ async fn handle_audio_connection(
|
|||||||
Ok((AUDIO_MSG_RX_FRAME, payload)) => {
|
Ok((AUDIO_MSG_RX_FRAME, payload)) => {
|
||||||
let _ = rx_tx.send(Bytes::from(payload));
|
let _ = rx_tx.send(Bytes::from(payload));
|
||||||
}
|
}
|
||||||
|
Ok((AUDIO_MSG_HISTORY_COMPRESSED, payload)) => {
|
||||||
|
// Decompress gzip blob, then iterate the embedded framed messages.
|
||||||
|
let mut decompressed = Vec::new();
|
||||||
|
if GzDecoder::new(payload.as_slice())
|
||||||
|
.read_to_end(&mut decompressed)
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
let mut pos = 0;
|
||||||
|
while pos + 5 <= decompressed.len() {
|
||||||
|
let _msg_type = decompressed[pos];
|
||||||
|
let len = u32::from_be_bytes([
|
||||||
|
decompressed[pos + 1],
|
||||||
|
decompressed[pos + 2],
|
||||||
|
decompressed[pos + 3],
|
||||||
|
decompressed[pos + 4],
|
||||||
|
]) as usize;
|
||||||
|
pos += 5;
|
||||||
|
if pos + len > decompressed.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let json = &decompressed[pos..pos + len];
|
||||||
|
if let Ok(msg) = serde_json::from_slice::<DecodedMessage>(json) {
|
||||||
|
let _ = decode_tx.send(msg);
|
||||||
|
}
|
||||||
|
pos += len;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok((
|
Ok((
|
||||||
AUDIO_MSG_VDES_DECODE
|
AUDIO_MSG_VDES_DECODE
|
||||||
| AUDIO_MSG_AIS_DECODE
|
| AUDIO_MSG_AIS_DECODE
|
||||||
|
|||||||
@@ -12,3 +12,4 @@ tokio = { workspace = true, features = ["full"] }
|
|||||||
serde = { workspace = true, features = ["derive"] }
|
serde = { workspace = true, features = ["derive"] }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
|
flate2 = { workspace = true }
|
||||||
|
|||||||
@@ -18,9 +18,15 @@ pub const AUDIO_MSG_WSPR_DECODE: u8 = 0x06;
|
|||||||
pub const AUDIO_MSG_AIS_DECODE: u8 = 0x07;
|
pub const AUDIO_MSG_AIS_DECODE: u8 = 0x07;
|
||||||
pub const AUDIO_MSG_VDES_DECODE: u8 = 0x08;
|
pub const AUDIO_MSG_VDES_DECODE: u8 = 0x08;
|
||||||
pub const AUDIO_MSG_HF_APRS_DECODE: u8 = 0x09;
|
pub const AUDIO_MSG_HF_APRS_DECODE: u8 = 0x09;
|
||||||
|
/// Compressed history blob: payload is a gzip-compressed sequence of normal
|
||||||
|
/// framed messages (each: `[1 byte type][4 bytes BE length][payload]`).
|
||||||
|
pub const AUDIO_MSG_HISTORY_COMPRESSED: u8 = 0x0a;
|
||||||
|
|
||||||
/// Maximum payload size (1 MB) to reject bogus frames early.
|
/// Maximum payload size for normal messages (1 MB).
|
||||||
const MAX_PAYLOAD_SIZE: u32 = 1_048_576;
|
const MAX_PAYLOAD_SIZE: u32 = 1_048_576;
|
||||||
|
/// Maximum payload size for the compressed history blob (16 MB).
|
||||||
|
/// A compressed 24-hour history on a busy channel can reach several MB.
|
||||||
|
const MAX_HISTORY_PAYLOAD_SIZE: u32 = 16_777_216;
|
||||||
|
|
||||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
pub struct AudioStreamInfo {
|
pub struct AudioStreamInfo {
|
||||||
@@ -59,10 +65,15 @@ pub async fn read_audio_msg<R: AsyncRead + Unpin>(
|
|||||||
) -> std::io::Result<(u8, Vec<u8>)> {
|
) -> std::io::Result<(u8, Vec<u8>)> {
|
||||||
let msg_type = reader.read_u8().await?;
|
let msg_type = reader.read_u8().await?;
|
||||||
let len = reader.read_u32().await?;
|
let len = reader.read_u32().await?;
|
||||||
if len > MAX_PAYLOAD_SIZE {
|
let limit = if msg_type == AUDIO_MSG_HISTORY_COMPRESSED {
|
||||||
|
MAX_HISTORY_PAYLOAD_SIZE
|
||||||
|
} else {
|
||||||
|
MAX_PAYLOAD_SIZE
|
||||||
|
};
|
||||||
|
if len > limit {
|
||||||
return Err(std::io::Error::new(
|
return Err(std::io::Error::new(
|
||||||
std::io::ErrorKind::InvalidData,
|
std::io::ErrorKind::InvalidData,
|
||||||
format!("audio frame too large: {} bytes", len),
|
format!("audio frame too large: {} bytes (type={:#04x})", len, msg_type),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
let mut payload = vec![0u8; len as usize];
|
let mut payload = vec![0u8; len as usize];
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ default = ["soapysdr"]
|
|||||||
soapysdr = ["trx-backend/soapysdr"]
|
soapysdr = ["trx-backend/soapysdr"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
flate2 = { workspace = true }
|
||||||
tokio = { workspace = true, features = ["full"] }
|
tokio = { workspace = true, features = ["full"] }
|
||||||
tokio-serial = { workspace = true }
|
tokio-serial = { workspace = true }
|
||||||
serde = { workspace = true, features = ["derive"] }
|
serde = { workspace = true, features = ["derive"] }
|
||||||
@@ -37,4 +38,4 @@ trx-cw = { path = "../decoders/trx-cw" }
|
|||||||
trx-decode-log = { path = "../decoders/trx-decode-log" }
|
trx-decode-log = { path = "../decoders/trx-decode-log" }
|
||||||
trx-ft8 = { path = "../decoders/trx-ft8" }
|
trx-ft8 = { path = "../decoders/trx-ft8" }
|
||||||
trx-wspr = { path = "../decoders/trx-wspr" }
|
trx-wspr = { path = "../decoders/trx-wspr" }
|
||||||
trx-protocol = { path = "../trx-protocol" }
|
trx-protocol = { path = "../trx-protocol" }
|
||||||
@@ -11,7 +11,10 @@ use std::sync::{Arc, Mutex};
|
|||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use flate2::write::GzEncoder;
|
||||||
|
use flate2::Compression;
|
||||||
use num_complex::Complex;
|
use num_complex::Complex;
|
||||||
|
use std::io::Write as _;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::sync::{broadcast, mpsc, watch};
|
use tokio::sync::{broadcast, mpsc, watch};
|
||||||
@@ -22,8 +25,8 @@ use trx_aprs::AprsDecoder;
|
|||||||
use trx_core::audio::{
|
use trx_core::audio::{
|
||||||
read_audio_msg, write_audio_msg, AudioStreamInfo,
|
read_audio_msg, write_audio_msg, AudioStreamInfo,
|
||||||
AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE,
|
AUDIO_MSG_AIS_DECODE, AUDIO_MSG_APRS_DECODE, AUDIO_MSG_CW_DECODE, AUDIO_MSG_FT8_DECODE,
|
||||||
AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_RX_FRAME, AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME,
|
AUDIO_MSG_HF_APRS_DECODE, AUDIO_MSG_HISTORY_COMPRESSED, AUDIO_MSG_RX_FRAME,
|
||||||
AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE,
|
AUDIO_MSG_STREAM_INFO, AUDIO_MSG_TX_FRAME, AUDIO_MSG_VDES_DECODE, AUDIO_MSG_WSPR_DECODE,
|
||||||
};
|
};
|
||||||
use trx_core::decode::{
|
use trx_core::decode::{
|
||||||
AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage,
|
AisMessage, AprsPacket, CwEvent, DecodedMessage, Ft8Message, VdesMessage, WsprMessage,
|
||||||
@@ -1856,15 +1859,27 @@ async fn handle_audio_client(
|
|||||||
};
|
};
|
||||||
let (blob, replayed_history_count) = history_blob;
|
let (blob, replayed_history_count) = history_blob;
|
||||||
if !blob.is_empty() {
|
if !blob.is_empty() {
|
||||||
writer.write_all(&blob).await?;
|
// Gzip-compress the blob before sending. JSON history compresses very
|
||||||
writer.flush().await?;
|
// well (~10-20x) so this dramatically reduces both transfer size and
|
||||||
}
|
// the time the client spends waiting for data.
|
||||||
if replayed_history_count > 0 {
|
let compressed = {
|
||||||
|
let mut enc = GzEncoder::new(
|
||||||
|
Vec::with_capacity(blob.len() / 8),
|
||||||
|
Compression::fast(),
|
||||||
|
);
|
||||||
|
enc.write_all(&blob)
|
||||||
|
.and_then(|_| enc.finish())
|
||||||
|
.unwrap_or(blob.clone())
|
||||||
|
};
|
||||||
|
write_audio_msg(&mut writer, AUDIO_MSG_HISTORY_COMPRESSED, &compressed).await?;
|
||||||
info!(
|
info!(
|
||||||
"Audio client {} replayed {} history messages in {:?}",
|
"Audio client {} replayed {} history messages in {:?} ({} → {} bytes, {:.1}x)",
|
||||||
peer,
|
peer,
|
||||||
replayed_history_count,
|
replayed_history_count,
|
||||||
history_replay_started_at.elapsed()
|
history_replay_started_at.elapsed(),
|
||||||
|
blob.len(),
|
||||||
|
compressed.len(),
|
||||||
|
blob.len() as f64 / compressed.len().max(1) as f64,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user