From 0d6a35a933a3e8ebb2ff6afd8da04b3cba88fbde Mon Sep 17 00:00:00 2001 From: Stan Grams Date: Mon, 23 Feb 2026 18:23:35 +0100 Subject: [PATCH] [feat](trx-server): add APRS-IS IGate uplink (aprs.fi integration) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Forwards CRC-valid RF APRS packets to APRS-IS via plain TCP using the TNC2 line format, making them visible on aprs.fi and other APRS-IS consumers. Mirrors the pskreporter module in structure. - New aprsfi.rs: IGate task with reconnect loop (exponential backoff 1s→60s), login/logresp, 60s keepalive, 60s stats, passcode auto-computation from callsign (standard APRS hash algorithm) - config.rs: AprsFiConfig struct with enabled/host/port/passcode fields and validation; default host rotate.aprs.net:14580 - main.rs: mod aprsfi; spawn task inside audio block when aprsfi.enabled - trx-server.toml.example, CONFIGURATION.md: document [aprsfi] section - Remove APRSFI_IMPLEMENTATION.rs planning artifact Co-Authored-By: Claude Sonnet 4.6 --- APRSFI_IMPLEMENTATION.rs | 129 --------------- CONFIGURATION.md | 13 ++ src/trx-server/src/aprsfi.rs | 306 +++++++++++++++++++++++++++++++++++ src/trx-server/src/config.rs | 41 +++++ src/trx-server/src/main.rs | 18 +++ trx-server.toml.example | 11 ++ 6 files changed, 389 insertions(+), 129 deletions(-) delete mode 100644 APRSFI_IMPLEMENTATION.rs create mode 100644 src/trx-server/src/aprsfi.rs diff --git a/APRSFI_IMPLEMENTATION.rs b/APRSFI_IMPLEMENTATION.rs deleted file mode 100644 index 4e48cdc..0000000 --- a/APRSFI_IMPLEMENTATION.rs +++ /dev/null @@ -1,129 +0,0 @@ -//! APRS.fi integration implementation draft (server-side) -//! -//! Goal: -//! - Add optional APRS.fi upload/logging support for decoded APRS packets. -//! - Keep feature disabled by default. -//! - Reuse existing decode pipeline in `trx-server`. -//! -//! This is a planning artifact, not active runtime logic. - -/// Delivery phases. -#[allow(dead_code)] -pub enum Phase { - Config, - PacketSelection, - UplinkWorker, - RetryAndRateLimit, - PrivacyControls, - Tests, - Docs, -} - -/// Proposed config block for `trx-server.toml`. -#[allow(dead_code)] -pub const CONFIG_PROPOSAL: &str = r#" -[aprsfi] -enabled = false - -# APRS.fi API token / key (required when enabled) -api_key = "" - -# Optional station identity metadata -receiver_callsign = "N0CALL" -receiver_locator = "JO93" - -# Upload endpoint override for testing -endpoint = "https://api.aprs.fi/api" - -# Upload policy -include_third_party = false -min_interval_ms = 1000 -max_queue = 1000 -"#; - -/// Validation rules. -#[allow(dead_code)] -pub const VALIDATION: &[&str] = &[ - "If aprsfi.enabled=false: ignore all aprsfi fields", - "If aprsfi.enabled=true: api_key must be non-empty", - "min_interval_ms must be > 0", - "max_queue must be > 0", -]; - -/// Runtime architecture. -#[allow(dead_code)] -pub const ARCHITECTURE: &[&str] = &[ - "Spawn dedicated APRS.fi worker task in src/trx-server/src/main.rs", - "Subscribe to decode broadcast channel (existing decode_tx.subscribe())", - "Filter DecodedMessage::Aprs only", - "Transform AprsPacket into APRS.fi payload DTO", - "Queue and POST asynchronously with bounded backpressure", - "Never block decoder tasks on network I/O", -]; - -/// Integration points in current code. -#[allow(dead_code)] -pub const INTEGRATION_POINTS: &[&str] = &[ - "src/trx-server/src/config.rs: add AprsFiConfig", - "src/trx-server/src/main.rs: start worker when enabled", - "src/trx-server/src/audio.rs: no direct changes required (consume from decode stream)", - "src/trx-server/src//aprsfi.rs: worker + payload mapping + HTTP client", - "trx-server.toml.example + CONFIGURATION.md: docs", -]; - -/// Packet handling policy. -#[allow(dead_code)] -pub const PACKET_POLICY: &[&str] = &[ - "Upload only packets with valid callsign and parseable position by default", - "Optionally allow non-position packets if APRS.fi endpoint supports them", - "Deduplicate burst repeats (same src/info within short window)", - "Drop malformed frames silently with debug log", -]; - -/// Retry/rate limiting policy. -#[allow(dead_code)] -pub const RELIABILITY_POLICY: &[&str] = &[ - "Bounded mpsc queue (max_queue)", - "If queue full: drop oldest or newest by configurable policy (MVP: drop newest)", - "Exponential backoff on HTTP/network errors", - "Respect min_interval_ms between outbound requests", - "Throttle warning logs to avoid spam", -]; - -/// Privacy/safety controls. -#[allow(dead_code)] -pub const PRIVACY_CONTROLS: &[&str] = &[ - "Feature disabled by default", - "API key never logged", - "Optional include_third_party flag for re-published packets", - "Document that enabling uploads sends decoded RF data to external service", -]; - -/// Test plan. -#[allow(dead_code)] -pub const TEST_PLAN: &[&str] = &[ - "Unit: config parse + validation", - "Unit: APRS packet -> APRS.fi payload mapping", - "Unit: dedupe and queue/backpressure behavior", - "Unit: retry/backoff timing logic", - "Integration: mock HTTP endpoint receives expected payloads", - "Integration: disabled mode performs no outbound requests", -]; - -/// Suggested first implementation milestone (M1). -#[allow(dead_code)] -pub const M1: &[&str] = &[ - "Add config + validation + docs", - "Create aprsfi worker skeleton (no uploads yet, just consume + structured logs)", - "Add payload mapping function with tests", - "Add feature flag + startup logs", -]; - -/// Suggested second milestone (M2). -#[allow(dead_code)] -pub const M2: &[&str] = &[ - "Implement real HTTP POST uploads", - "Add retry/backoff + queue policy", - "Add integration test with mock server", - "Add operational metrics counters", -]; diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 4b4951d..2cf434b 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -97,6 +97,19 @@ Notes: - If `receiver_locator` is omitted, server tries deriving it from `[general].latitude`/`longitude`. - PSK Reporter software ID is hardcoded to: `trx-server v by SP2SJG`. +### `[aprsfi]` +- `enabled` (`bool`, default: `false`) +- `host` (`string`, default: `"rotate.aprs.net"`, must not be empty when enabled) +- `port` (`u16`, default: `14580`, must be `> 0` when enabled) +- `passcode` (`i32`, default: `-1`) + +Notes: +- When `passcode = -1` (the default), the passcode is auto-computed from `[general].callsign` using the standard APRS-IS hash algorithm. +- `[general].callsign` must be non-empty when `[aprsfi].enabled = true`; otherwise the IGate is silently disabled at startup. +- Only APRS packets with valid CRC are forwarded; packets from other decoders (FT8, WSPR, CW) are ignored. +- The IGate reconnects automatically with exponential backoff (1 s → 2 s → … → 60 s) on TCP errors. +- Requires `[audio].enabled = true` (APRS packets are decoded from audio). + ### `[decode_logs]` - `enabled` (`bool`, default: `false`) - `dir` (`string`, default: `"$XDG_DATA_HOME/trx-rs/decoders"`; fallback: `"logs/decoders"`, must not be empty when enabled) diff --git a/src/trx-server/src/aprsfi.rs b/src/trx-server/src/aprsfi.rs new file mode 100644 index 0000000..cdf20d7 --- /dev/null +++ b/src/trx-server/src/aprsfi.rs @@ -0,0 +1,306 @@ +// SPDX-FileCopyrightText: 2026 Stanislaw Grams +// +// SPDX-License-Identifier: BSD-2-Clause + +//! APRS-IS IGate uplink — forwards RF-decoded APRS packets to APRS-IS (aprs.fi etc.). + +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tokio::sync::broadcast; +use tokio::time::{self, Duration}; +use tracing::{debug, info, warn}; + +use trx_core::decode::{AprsPacket, DecodedMessage}; + +use crate::config::AprsFiConfig; + +/// Compute the APRS-IS passcode for a callsign. +/// +/// Algorithm matches the canonical JS/Python reference implementations: +/// - Strip SSID (everything from `-` onwards) +/// - Take the first 10 characters, uppercased +/// - XOR hash initialised at 0x73E2, processed in 2-byte pairs +/// - Mask result with 0x7FFF +pub fn compute_passcode(callsign: &str) -> u16 { + // Strip SSID + let base = callsign.split('-').next().unwrap_or(callsign); + // First 10 chars, uppercase + let upper: String = base.chars().take(10).map(|c| c.to_ascii_uppercase()).collect(); + let bytes = upper.as_bytes(); + + let mut hash: u16 = 0x73e2; + let mut i = 0; + while i < bytes.len() { + hash ^= (bytes[i] as u16) << 8; + if i + 1 < bytes.len() { + hash ^= bytes[i + 1] as u16; + } + i += 2; + } + hash & 0x7fff +} + +/// Format an [`AprsPacket`] as a TNC2 line (CRLF-terminated) for APRS-IS. +fn format_tnc2(pkt: &AprsPacket) -> String { + if pkt.path.is_empty() { + format!("{}>{}:{}\r\n", pkt.src_call, pkt.dest_call, pkt.info) + } else { + format!( + "{}>{},{}:{}\r\n", + pkt.src_call, pkt.dest_call, pkt.path, pkt.info + ) + } +} + +/// Run the APRS-IS IGate uplink task. +/// +/// Subscribes to the decoded-message broadcast channel and forwards every +/// CRC-valid APRS packet to the configured APRS-IS server as a TNC2 line. +/// Reconnects automatically with exponential backoff (1 s → 2 s → … → 60 s). +pub async fn run_aprsfi_uplink( + cfg: AprsFiConfig, + callsign: String, + mut decode_rx: broadcast::Receiver, +) { + let passcode: u16 = if cfg.passcode == -1 { + compute_passcode(&callsign) + } else { + (cfg.passcode as u16) & 0x7fff + }; + + let mut stats_received: u64 = 0; + let mut stats_forwarded: u64 = 0; + let mut stats_skipped: u64 = 0; + let mut stats_write_errors: u64 = 0; + let mut stats_reconnects: u64 = 0; + let mut backoff_secs: u64 = 1; + + 'reconnect: loop { + // ---------------------------------------------------------------- + // TCP connect + // ---------------------------------------------------------------- + let stream = match TcpStream::connect((cfg.host.as_str(), cfg.port)).await { + Ok(s) => s, + Err(e) => { + warn!( + "APRS-IS IGate: connection to {}:{} failed: {}, retrying in {}s", + cfg.host, cfg.port, e, backoff_secs + ); + time::sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(60); + stats_reconnects += 1; + continue 'reconnect; + } + }; + + let (read_half, mut write_half) = stream.into_split(); + let mut reader = BufReader::new(read_half); + + // ---------------------------------------------------------------- + // Login + // ---------------------------------------------------------------- + let login = format!( + "user {} pass {} vers trx-server {}\r\n", + callsign, + passcode, + env!("CARGO_PKG_VERSION") + ); + if let Err(e) = write_half.write_all(login.as_bytes()).await { + warn!( + "APRS-IS IGate: login write to {}:{} failed: {}, retrying in {}s", + cfg.host, cfg.port, e, backoff_secs + ); + time::sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(60); + stats_reconnects += 1; + continue 'reconnect; + } + + // ---------------------------------------------------------------- + // Read logresp (up to 10 lines) + // ---------------------------------------------------------------- + let mut verified = false; + let mut got_logresp = false; + let mut line = String::new(); + for _ in 0..10 { + line.clear(); + match reader.read_line(&mut line).await { + Ok(0) => { + warn!("APRS-IS IGate: connection closed before logresp from {}:{}", cfg.host, cfg.port); + break; + } + Ok(_) => { + if line.starts_with("# logresp") { + verified = !line.contains("unverified"); + got_logresp = true; + break; + } + } + Err(e) => { + warn!("APRS-IS IGate: error reading logresp from {}:{}: {}", cfg.host, cfg.port, e); + break; + } + } + } + + if !got_logresp { + warn!( + "APRS-IS IGate: no logresp from {}:{}, retrying in {}s", + cfg.host, cfg.port, backoff_secs + ); + time::sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(60); + stats_reconnects += 1; + continue 'reconnect; + } + + info!( + "APRS-IS IGate connected ({}:{} as {}, {})", + cfg.host, + cfg.port, + callsign, + if verified { "verified" } else { "unverified" } + ); + + // Successful connection — reset backoff + backoff_secs = 1; + + // ---------------------------------------------------------------- + // Forward loop + // ---------------------------------------------------------------- + let period = Duration::from_secs(60); + let first_at = time::Instant::now() + period; + let mut keepalive_tick = time::interval_at(first_at, period); + let mut stats_tick = time::interval_at(first_at, period); + + 'forward: loop { + tokio::select! { + _ = keepalive_tick.tick() => { + if let Err(e) = write_half.write_all(b"# trx-server keepalive\r\n").await { + warn!("APRS-IS IGate: keepalive write failed: {}", e); + stats_write_errors += 1; + break 'forward; + } + } + + _ = stats_tick.tick() => { + info!( + "APRS-IS stats: received={}, forwarded={}, skipped={}, write_errors={}, reconnects={}", + stats_received, stats_forwarded, stats_skipped, + stats_write_errors, stats_reconnects + ); + } + + recv = decode_rx.recv() => { + match recv { + Ok(DecodedMessage::Aprs(pkt)) => { + stats_received += 1; + if !pkt.crc_ok { + stats_skipped += 1; + continue 'forward; + } + let tnc2 = format_tnc2(&pkt); + debug!("APRS-IS: forwarded {}>{},...", pkt.src_call, pkt.dest_call); + if let Err(e) = write_half.write_all(tnc2.as_bytes()).await { + warn!("APRS-IS IGate: packet write failed: {}", e); + stats_write_errors += 1; + break 'forward; + } + stats_forwarded += 1; + } + Ok(_) => { + // Non-APRS messages (FT8, WSPR, CW) are silently skipped + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("APRS-IS IGate: dropped {} decode events (channel lagged)", n); + } + Err(broadcast::error::RecvError::Closed) => { + return; + } + } + } + } + } + + // Forward loop exited due to a write error — reconnect with backoff + stats_reconnects += 1; + warn!( + "APRS-IS IGate: disconnected from {}:{}, reconnecting in {}s", + cfg.host, cfg.port, backoff_secs + ); + time::sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(60); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use trx_core::decode::AprsPacket; + + fn make_pkt(src: &str, dest: &str, path: &str, info: &str, crc_ok: bool) -> AprsPacket { + AprsPacket { + src_call: src.to_string(), + dest_call: dest.to_string(), + path: path.to_string(), + info: info.to_string(), + info_bytes: vec![], + packet_type: "Unknown".to_string(), + crc_ok, + lat: None, + lon: None, + symbol_table: None, + symbol_code: None, + } + } + + #[test] + fn passcode_result_in_valid_range() { + assert!(compute_passcode("N0CALL") <= 0x7fff); + assert!(compute_passcode("W1AW") <= 0x7fff); + assert!(compute_passcode("SP2SJG") <= 0x7fff); + } + + #[test] + fn passcode_strips_ssid() { + assert_eq!(compute_passcode("N0CALL"), compute_passcode("N0CALL-9")); + assert_eq!(compute_passcode("W1AW"), compute_passcode("W1AW-5")); + assert_eq!(compute_passcode("SP2SJG"), compute_passcode("SP2SJG-15")); + } + + #[test] + fn passcode_case_insensitive() { + assert_eq!(compute_passcode("n0call"), compute_passcode("N0CALL")); + assert_eq!(compute_passcode("sp2sjg"), compute_passcode("SP2SJG")); + } + + #[test] + fn passcode_truncates_to_ten_chars() { + // Callsigns are at most 10 chars after stripping SSID; extra chars must be ignored + assert_eq!( + compute_passcode("ABCDEFGHIJ"), + compute_passcode("ABCDEFGHIJKL") + ); + } + + #[test] + fn tnc2_with_path() { + let pkt = make_pkt( + "N0CALL-9", + "APRS", + "WIDE1-1,WIDE2-1", + "!1234.56N/01234.56E-Test", + true, + ); + assert_eq!( + format_tnc2(&pkt), + "N0CALL-9>APRS,WIDE1-1,WIDE2-1:!1234.56N/01234.56E-Test\r\n" + ); + } + + #[test] + fn tnc2_without_path() { + let pkt = make_pkt("W1AW", "BEACON", "", ">Test status", true); + assert_eq!(format_tnc2(&pkt), "W1AW>BEACON:>Test status\r\n"); + } +} diff --git a/src/trx-server/src/config.rs b/src/trx-server/src/config.rs index 44a5ca4..a41373f 100644 --- a/src/trx-server/src/config.rs +++ b/src/trx-server/src/config.rs @@ -35,6 +35,8 @@ pub struct ServerConfig { pub audio: AudioConfig, /// PSK Reporter uplink configuration pub pskreporter: PskReporterConfig, + /// APRS-IS IGate uplink configuration + pub aprsfi: AprsFiConfig, /// Decoder file logging configuration pub decode_logs: DecodeLogsConfig, } @@ -233,6 +235,31 @@ impl Default for PskReporterConfig { } } +/// APRS-IS IGate uplink configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct AprsFiConfig { + /// Whether APRS-IS IGate uplink is enabled + pub enabled: bool, + /// APRS-IS server hostname + pub host: String, + /// APRS-IS server port + pub port: u16, + /// APRS-IS passcode. -1 = auto-compute from [general].callsign. + pub passcode: i32, +} + +impl Default for AprsFiConfig { + fn default() -> Self { + Self { + enabled: false, + host: "rotate.aprs.net".to_string(), + port: 14580, + passcode: -1, + } + } +} + fn default_decode_logs_dir() -> String { if let Some(data_dir) = dirs::data_dir() { return data_dir @@ -351,6 +378,15 @@ impl ServerConfig { } } + if self.aprsfi.enabled { + if self.aprsfi.host.trim().is_empty() { + return Err("[aprsfi].host must not be empty".to_string()); + } + if self.aprsfi.port == 0 { + return Err("[aprsfi].port must be > 0".to_string()); + } + } + if self.decode_logs.enabled { if self.decode_logs.dir.trim().is_empty() { return Err("[decode_logs].dir must not be empty when enabled".to_string()); @@ -405,6 +441,7 @@ impl ServerConfig { listen: ListenConfig::default(), audio: AudioConfig::default(), pskreporter: PskReporterConfig::default(), + aprsfi: AprsFiConfig::default(), decode_logs: DecodeLogsConfig::default(), }; @@ -538,6 +575,10 @@ mod tests { assert_eq!(config.audio.sample_rate, 48000); assert!(!config.pskreporter.enabled); assert_eq!(config.pskreporter.port, 4739); + assert!(!config.aprsfi.enabled); + assert_eq!(config.aprsfi.host, "rotate.aprs.net"); + assert_eq!(config.aprsfi.port, 14580); + assert_eq!(config.aprsfi.passcode, -1); assert!(!config.decode_logs.enabled); assert!( std::path::Path::new(&config.decode_logs.dir) diff --git a/src/trx-server/src/main.rs b/src/trx-server/src/main.rs index bef86f2..42611a1 100644 --- a/src/trx-server/src/main.rs +++ b/src/trx-server/src/main.rs @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: BSD-2-Clause +mod aprsfi; mod audio; mod config; mod decode; @@ -406,6 +407,23 @@ async fn main() -> DynResult<()> { } } + if cfg.aprsfi.enabled { + let callsign = resolved.callsign.clone().unwrap_or_default(); + if callsign.trim().is_empty() { + warn!("APRS-IS IGate enabled but [general].callsign is empty; uplink disabled"); + } else { + let ai_cfg = cfg.aprsfi.clone(); + let ai_decode_rx = decode_tx.subscribe(); + let ai_shutdown_rx = shutdown_rx.clone(); + task_handles.push(tokio::spawn(async move { + tokio::select! { + _ = aprsfi::run_aprsfi_uplink(ai_cfg, callsign, ai_decode_rx) => {} + _ = wait_for_shutdown(ai_shutdown_rx) => {} + } + })); + } + } + let decoder_logs = match DecoderLoggers::from_config(&cfg.decode_logs) { Ok(v) => v, Err(e) => { diff --git a/trx-server.toml.example b/trx-server.toml.example index d6f1004..9a1f5fb 100644 --- a/trx-server.toml.example +++ b/trx-server.toml.example @@ -76,6 +76,17 @@ port = 4739 # If omitted, it is derived from [general].latitude/[general].longitude. # receiver_locator = "JO93" +[aprsfi] +# Enable APRS-IS IGate uplink (forwards received RF APRS packets to APRS-IS / aprs.fi) +enabled = false + +# APRS-IS server (rotate.aprs.net does DNS round-robin across all tier-2 servers) +host = "rotate.aprs.net" +port = 14580 + +# APRS-IS passcode. -1 = auto-computed from [general].callsign. +# passcode = -1 + [decode_logs] # Optional decoder message logs to files (APRS/CW/FT8/WSPR) enabled = false