[feat](trx-server): add APRS-IS IGate uplink (aprs.fi integration)

Forwards CRC-valid RF APRS packets to APRS-IS via plain TCP using the
TNC2 line format, making them visible on aprs.fi and other APRS-IS
consumers. Mirrors the pskreporter module in structure.

- New aprsfi.rs: IGate task with reconnect loop (exponential backoff
  1s→60s), login/logresp, 60s keepalive, 60s stats, passcode
  auto-computation from callsign (standard APRS hash algorithm)
- config.rs: AprsFiConfig struct with enabled/host/port/passcode fields
  and validation; default host rotate.aprs.net:14580
- main.rs: mod aprsfi; spawn task inside audio block when aprsfi.enabled
- trx-server.toml.example, CONFIGURATION.md: document [aprsfi] section
- Remove APRSFI_IMPLEMENTATION.rs planning artifact

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-23 18:23:35 +01:00
parent e12a3dfa4f
commit 0d6a35a933
6 changed files with 389 additions and 129 deletions
-129
View File
@@ -1,129 +0,0 @@
//! APRS.fi integration implementation draft (server-side)
//!
//! Goal:
//! - Add optional APRS.fi upload/logging support for decoded APRS packets.
//! - Keep feature disabled by default.
//! - Reuse existing decode pipeline in `trx-server`.
//!
//! This is a planning artifact, not active runtime logic.
/// Delivery phases.
#[allow(dead_code)]
pub enum Phase {
Config,
PacketSelection,
UplinkWorker,
RetryAndRateLimit,
PrivacyControls,
Tests,
Docs,
}
/// Proposed config block for `trx-server.toml`.
#[allow(dead_code)]
pub const CONFIG_PROPOSAL: &str = r#"
[aprsfi]
enabled = false
# APRS.fi API token / key (required when enabled)
api_key = ""
# Optional station identity metadata
receiver_callsign = "N0CALL"
receiver_locator = "JO93"
# Upload endpoint override for testing
endpoint = "https://api.aprs.fi/api"
# Upload policy
include_third_party = false
min_interval_ms = 1000
max_queue = 1000
"#;
/// Validation rules.
#[allow(dead_code)]
pub const VALIDATION: &[&str] = &[
"If aprsfi.enabled=false: ignore all aprsfi fields",
"If aprsfi.enabled=true: api_key must be non-empty",
"min_interval_ms must be > 0",
"max_queue must be > 0",
];
/// Runtime architecture.
#[allow(dead_code)]
pub const ARCHITECTURE: &[&str] = &[
"Spawn dedicated APRS.fi worker task in src/trx-server/src/main.rs",
"Subscribe to decode broadcast channel (existing decode_tx.subscribe())",
"Filter DecodedMessage::Aprs only",
"Transform AprsPacket into APRS.fi payload DTO",
"Queue and POST asynchronously with bounded backpressure",
"Never block decoder tasks on network I/O",
];
/// Integration points in current code.
#[allow(dead_code)]
pub const INTEGRATION_POINTS: &[&str] = &[
"src/trx-server/src/config.rs: add AprsFiConfig",
"src/trx-server/src/main.rs: start worker when enabled",
"src/trx-server/src/audio.rs: no direct changes required (consume from decode stream)",
"src/trx-server/src/<new>/aprsfi.rs: worker + payload mapping + HTTP client",
"trx-server.toml.example + CONFIGURATION.md: docs",
];
/// Packet handling policy.
#[allow(dead_code)]
pub const PACKET_POLICY: &[&str] = &[
"Upload only packets with valid callsign and parseable position by default",
"Optionally allow non-position packets if APRS.fi endpoint supports them",
"Deduplicate burst repeats (same src/info within short window)",
"Drop malformed frames silently with debug log",
];
/// Retry/rate limiting policy.
#[allow(dead_code)]
pub const RELIABILITY_POLICY: &[&str] = &[
"Bounded mpsc queue (max_queue)",
"If queue full: drop oldest or newest by configurable policy (MVP: drop newest)",
"Exponential backoff on HTTP/network errors",
"Respect min_interval_ms between outbound requests",
"Throttle warning logs to avoid spam",
];
/// Privacy/safety controls.
#[allow(dead_code)]
pub const PRIVACY_CONTROLS: &[&str] = &[
"Feature disabled by default",
"API key never logged",
"Optional include_third_party flag for re-published packets",
"Document that enabling uploads sends decoded RF data to external service",
];
/// Test plan.
#[allow(dead_code)]
pub const TEST_PLAN: &[&str] = &[
"Unit: config parse + validation",
"Unit: APRS packet -> APRS.fi payload mapping",
"Unit: dedupe and queue/backpressure behavior",
"Unit: retry/backoff timing logic",
"Integration: mock HTTP endpoint receives expected payloads",
"Integration: disabled mode performs no outbound requests",
];
/// Suggested first implementation milestone (M1).
#[allow(dead_code)]
pub const M1: &[&str] = &[
"Add config + validation + docs",
"Create aprsfi worker skeleton (no uploads yet, just consume + structured logs)",
"Add payload mapping function with tests",
"Add feature flag + startup logs",
];
/// Suggested second milestone (M2).
#[allow(dead_code)]
pub const M2: &[&str] = &[
"Implement real HTTP POST uploads",
"Add retry/backoff + queue policy",
"Add integration test with mock server",
"Add operational metrics counters",
];
+13
View File
@@ -97,6 +97,19 @@ Notes:
- If `receiver_locator` is omitted, server tries deriving it from `[general].latitude`/`longitude`. - If `receiver_locator` is omitted, server tries deriving it from `[general].latitude`/`longitude`.
- PSK Reporter software ID is hardcoded to: `trx-server v<version> by SP2SJG`. - PSK Reporter software ID is hardcoded to: `trx-server v<version> by SP2SJG`.
### `[aprsfi]`
- `enabled` (`bool`, default: `false`)
- `host` (`string`, default: `"rotate.aprs.net"`, must not be empty when enabled)
- `port` (`u16`, default: `14580`, must be `> 0` when enabled)
- `passcode` (`i32`, default: `-1`)
Notes:
- When `passcode = -1` (the default), the passcode is auto-computed from `[general].callsign` using the standard APRS-IS hash algorithm.
- `[general].callsign` must be non-empty when `[aprsfi].enabled = true`; otherwise the IGate is silently disabled at startup.
- Only APRS packets with valid CRC are forwarded; packets from other decoders (FT8, WSPR, CW) are ignored.
- The IGate reconnects automatically with exponential backoff (1 s → 2 s → … → 60 s) on TCP errors.
- Requires `[audio].enabled = true` (APRS packets are decoded from audio).
### `[decode_logs]` ### `[decode_logs]`
- `enabled` (`bool`, default: `false`) - `enabled` (`bool`, default: `false`)
- `dir` (`string`, default: `"$XDG_DATA_HOME/trx-rs/decoders"`; fallback: `"logs/decoders"`, must not be empty when enabled) - `dir` (`string`, default: `"$XDG_DATA_HOME/trx-rs/decoders"`; fallback: `"logs/decoders"`, must not be empty when enabled)
+306
View File
@@ -0,0 +1,306 @@
// SPDX-FileCopyrightText: 2026 Stanislaw Grams <stanislawgrams@gmail.com>
//
// SPDX-License-Identifier: BSD-2-Clause
//! APRS-IS IGate uplink — forwards RF-decoded APRS packets to APRS-IS (aprs.fi etc.).
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio::sync::broadcast;
use tokio::time::{self, Duration};
use tracing::{debug, info, warn};
use trx_core::decode::{AprsPacket, DecodedMessage};
use crate::config::AprsFiConfig;
/// Compute the APRS-IS passcode for a callsign.
///
/// Algorithm matches the canonical JS/Python reference implementations:
/// - Strip SSID (everything from `-` onwards)
/// - Take the first 10 characters, uppercased
/// - XOR hash initialised at 0x73E2, processed in 2-byte pairs
/// - Mask result with 0x7FFF
pub fn compute_passcode(callsign: &str) -> u16 {
// Strip SSID
let base = callsign.split('-').next().unwrap_or(callsign);
// First 10 chars, uppercase
let upper: String = base.chars().take(10).map(|c| c.to_ascii_uppercase()).collect();
let bytes = upper.as_bytes();
let mut hash: u16 = 0x73e2;
let mut i = 0;
while i < bytes.len() {
hash ^= (bytes[i] as u16) << 8;
if i + 1 < bytes.len() {
hash ^= bytes[i + 1] as u16;
}
i += 2;
}
hash & 0x7fff
}
/// Format an [`AprsPacket`] as a TNC2 line (CRLF-terminated) for APRS-IS.
fn format_tnc2(pkt: &AprsPacket) -> String {
if pkt.path.is_empty() {
format!("{}>{}:{}\r\n", pkt.src_call, pkt.dest_call, pkt.info)
} else {
format!(
"{}>{},{}:{}\r\n",
pkt.src_call, pkt.dest_call, pkt.path, pkt.info
)
}
}
/// Run the APRS-IS IGate uplink task.
///
/// Subscribes to the decoded-message broadcast channel and forwards every
/// CRC-valid APRS packet to the configured APRS-IS server as a TNC2 line.
/// Reconnects automatically with exponential backoff (1 s → 2 s → … → 60 s).
pub async fn run_aprsfi_uplink(
cfg: AprsFiConfig,
callsign: String,
mut decode_rx: broadcast::Receiver<DecodedMessage>,
) {
let passcode: u16 = if cfg.passcode == -1 {
compute_passcode(&callsign)
} else {
(cfg.passcode as u16) & 0x7fff
};
let mut stats_received: u64 = 0;
let mut stats_forwarded: u64 = 0;
let mut stats_skipped: u64 = 0;
let mut stats_write_errors: u64 = 0;
let mut stats_reconnects: u64 = 0;
let mut backoff_secs: u64 = 1;
'reconnect: loop {
// ----------------------------------------------------------------
// TCP connect
// ----------------------------------------------------------------
let stream = match TcpStream::connect((cfg.host.as_str(), cfg.port)).await {
Ok(s) => s,
Err(e) => {
warn!(
"APRS-IS IGate: connection to {}:{} failed: {}, retrying in {}s",
cfg.host, cfg.port, e, backoff_secs
);
time::sleep(Duration::from_secs(backoff_secs)).await;
backoff_secs = (backoff_secs * 2).min(60);
stats_reconnects += 1;
continue 'reconnect;
}
};
let (read_half, mut write_half) = stream.into_split();
let mut reader = BufReader::new(read_half);
// ----------------------------------------------------------------
// Login
// ----------------------------------------------------------------
let login = format!(
"user {} pass {} vers trx-server {}\r\n",
callsign,
passcode,
env!("CARGO_PKG_VERSION")
);
if let Err(e) = write_half.write_all(login.as_bytes()).await {
warn!(
"APRS-IS IGate: login write to {}:{} failed: {}, retrying in {}s",
cfg.host, cfg.port, e, backoff_secs
);
time::sleep(Duration::from_secs(backoff_secs)).await;
backoff_secs = (backoff_secs * 2).min(60);
stats_reconnects += 1;
continue 'reconnect;
}
// ----------------------------------------------------------------
// Read logresp (up to 10 lines)
// ----------------------------------------------------------------
let mut verified = false;
let mut got_logresp = false;
let mut line = String::new();
for _ in 0..10 {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
warn!("APRS-IS IGate: connection closed before logresp from {}:{}", cfg.host, cfg.port);
break;
}
Ok(_) => {
if line.starts_with("# logresp") {
verified = !line.contains("unverified");
got_logresp = true;
break;
}
}
Err(e) => {
warn!("APRS-IS IGate: error reading logresp from {}:{}: {}", cfg.host, cfg.port, e);
break;
}
}
}
if !got_logresp {
warn!(
"APRS-IS IGate: no logresp from {}:{}, retrying in {}s",
cfg.host, cfg.port, backoff_secs
);
time::sleep(Duration::from_secs(backoff_secs)).await;
backoff_secs = (backoff_secs * 2).min(60);
stats_reconnects += 1;
continue 'reconnect;
}
info!(
"APRS-IS IGate connected ({}:{} as {}, {})",
cfg.host,
cfg.port,
callsign,
if verified { "verified" } else { "unverified" }
);
// Successful connection — reset backoff
backoff_secs = 1;
// ----------------------------------------------------------------
// Forward loop
// ----------------------------------------------------------------
let period = Duration::from_secs(60);
let first_at = time::Instant::now() + period;
let mut keepalive_tick = time::interval_at(first_at, period);
let mut stats_tick = time::interval_at(first_at, period);
'forward: loop {
tokio::select! {
_ = keepalive_tick.tick() => {
if let Err(e) = write_half.write_all(b"# trx-server keepalive\r\n").await {
warn!("APRS-IS IGate: keepalive write failed: {}", e);
stats_write_errors += 1;
break 'forward;
}
}
_ = stats_tick.tick() => {
info!(
"APRS-IS stats: received={}, forwarded={}, skipped={}, write_errors={}, reconnects={}",
stats_received, stats_forwarded, stats_skipped,
stats_write_errors, stats_reconnects
);
}
recv = decode_rx.recv() => {
match recv {
Ok(DecodedMessage::Aprs(pkt)) => {
stats_received += 1;
if !pkt.crc_ok {
stats_skipped += 1;
continue 'forward;
}
let tnc2 = format_tnc2(&pkt);
debug!("APRS-IS: forwarded {}>{},...", pkt.src_call, pkt.dest_call);
if let Err(e) = write_half.write_all(tnc2.as_bytes()).await {
warn!("APRS-IS IGate: packet write failed: {}", e);
stats_write_errors += 1;
break 'forward;
}
stats_forwarded += 1;
}
Ok(_) => {
// Non-APRS messages (FT8, WSPR, CW) are silently skipped
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("APRS-IS IGate: dropped {} decode events (channel lagged)", n);
}
Err(broadcast::error::RecvError::Closed) => {
return;
}
}
}
}
}
// Forward loop exited due to a write error — reconnect with backoff
stats_reconnects += 1;
warn!(
"APRS-IS IGate: disconnected from {}:{}, reconnecting in {}s",
cfg.host, cfg.port, backoff_secs
);
time::sleep(Duration::from_secs(backoff_secs)).await;
backoff_secs = (backoff_secs * 2).min(60);
}
}
#[cfg(test)]
mod tests {
use super::*;
use trx_core::decode::AprsPacket;
fn make_pkt(src: &str, dest: &str, path: &str, info: &str, crc_ok: bool) -> AprsPacket {
AprsPacket {
src_call: src.to_string(),
dest_call: dest.to_string(),
path: path.to_string(),
info: info.to_string(),
info_bytes: vec![],
packet_type: "Unknown".to_string(),
crc_ok,
lat: None,
lon: None,
symbol_table: None,
symbol_code: None,
}
}
#[test]
fn passcode_result_in_valid_range() {
assert!(compute_passcode("N0CALL") <= 0x7fff);
assert!(compute_passcode("W1AW") <= 0x7fff);
assert!(compute_passcode("SP2SJG") <= 0x7fff);
}
#[test]
fn passcode_strips_ssid() {
assert_eq!(compute_passcode("N0CALL"), compute_passcode("N0CALL-9"));
assert_eq!(compute_passcode("W1AW"), compute_passcode("W1AW-5"));
assert_eq!(compute_passcode("SP2SJG"), compute_passcode("SP2SJG-15"));
}
#[test]
fn passcode_case_insensitive() {
assert_eq!(compute_passcode("n0call"), compute_passcode("N0CALL"));
assert_eq!(compute_passcode("sp2sjg"), compute_passcode("SP2SJG"));
}
#[test]
fn passcode_truncates_to_ten_chars() {
// Callsigns are at most 10 chars after stripping SSID; extra chars must be ignored
assert_eq!(
compute_passcode("ABCDEFGHIJ"),
compute_passcode("ABCDEFGHIJKL")
);
}
#[test]
fn tnc2_with_path() {
let pkt = make_pkt(
"N0CALL-9",
"APRS",
"WIDE1-1,WIDE2-1",
"!1234.56N/01234.56E-Test",
true,
);
assert_eq!(
format_tnc2(&pkt),
"N0CALL-9>APRS,WIDE1-1,WIDE2-1:!1234.56N/01234.56E-Test\r\n"
);
}
#[test]
fn tnc2_without_path() {
let pkt = make_pkt("W1AW", "BEACON", "", ">Test status", true);
assert_eq!(format_tnc2(&pkt), "W1AW>BEACON:>Test status\r\n");
}
}
+41
View File
@@ -35,6 +35,8 @@ pub struct ServerConfig {
pub audio: AudioConfig, pub audio: AudioConfig,
/// PSK Reporter uplink configuration /// PSK Reporter uplink configuration
pub pskreporter: PskReporterConfig, pub pskreporter: PskReporterConfig,
/// APRS-IS IGate uplink configuration
pub aprsfi: AprsFiConfig,
/// Decoder file logging configuration /// Decoder file logging configuration
pub decode_logs: DecodeLogsConfig, pub decode_logs: DecodeLogsConfig,
} }
@@ -233,6 +235,31 @@ impl Default for PskReporterConfig {
} }
} }
/// APRS-IS IGate uplink configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AprsFiConfig {
/// Whether APRS-IS IGate uplink is enabled
pub enabled: bool,
/// APRS-IS server hostname
pub host: String,
/// APRS-IS server port
pub port: u16,
/// APRS-IS passcode. -1 = auto-compute from [general].callsign.
pub passcode: i32,
}
impl Default for AprsFiConfig {
fn default() -> Self {
Self {
enabled: false,
host: "rotate.aprs.net".to_string(),
port: 14580,
passcode: -1,
}
}
}
fn default_decode_logs_dir() -> String { fn default_decode_logs_dir() -> String {
if let Some(data_dir) = dirs::data_dir() { if let Some(data_dir) = dirs::data_dir() {
return data_dir return data_dir
@@ -351,6 +378,15 @@ impl ServerConfig {
} }
} }
if self.aprsfi.enabled {
if self.aprsfi.host.trim().is_empty() {
return Err("[aprsfi].host must not be empty".to_string());
}
if self.aprsfi.port == 0 {
return Err("[aprsfi].port must be > 0".to_string());
}
}
if self.decode_logs.enabled { if self.decode_logs.enabled {
if self.decode_logs.dir.trim().is_empty() { if self.decode_logs.dir.trim().is_empty() {
return Err("[decode_logs].dir must not be empty when enabled".to_string()); return Err("[decode_logs].dir must not be empty when enabled".to_string());
@@ -405,6 +441,7 @@ impl ServerConfig {
listen: ListenConfig::default(), listen: ListenConfig::default(),
audio: AudioConfig::default(), audio: AudioConfig::default(),
pskreporter: PskReporterConfig::default(), pskreporter: PskReporterConfig::default(),
aprsfi: AprsFiConfig::default(),
decode_logs: DecodeLogsConfig::default(), decode_logs: DecodeLogsConfig::default(),
}; };
@@ -538,6 +575,10 @@ mod tests {
assert_eq!(config.audio.sample_rate, 48000); assert_eq!(config.audio.sample_rate, 48000);
assert!(!config.pskreporter.enabled); assert!(!config.pskreporter.enabled);
assert_eq!(config.pskreporter.port, 4739); assert_eq!(config.pskreporter.port, 4739);
assert!(!config.aprsfi.enabled);
assert_eq!(config.aprsfi.host, "rotate.aprs.net");
assert_eq!(config.aprsfi.port, 14580);
assert_eq!(config.aprsfi.passcode, -1);
assert!(!config.decode_logs.enabled); assert!(!config.decode_logs.enabled);
assert!( assert!(
std::path::Path::new(&config.decode_logs.dir) std::path::Path::new(&config.decode_logs.dir)
+18
View File
@@ -2,6 +2,7 @@
// //
// SPDX-License-Identifier: BSD-2-Clause // SPDX-License-Identifier: BSD-2-Clause
mod aprsfi;
mod audio; mod audio;
mod config; mod config;
mod decode; mod decode;
@@ -406,6 +407,23 @@ async fn main() -> DynResult<()> {
} }
} }
if cfg.aprsfi.enabled {
let callsign = resolved.callsign.clone().unwrap_or_default();
if callsign.trim().is_empty() {
warn!("APRS-IS IGate enabled but [general].callsign is empty; uplink disabled");
} else {
let ai_cfg = cfg.aprsfi.clone();
let ai_decode_rx = decode_tx.subscribe();
let ai_shutdown_rx = shutdown_rx.clone();
task_handles.push(tokio::spawn(async move {
tokio::select! {
_ = aprsfi::run_aprsfi_uplink(ai_cfg, callsign, ai_decode_rx) => {}
_ = wait_for_shutdown(ai_shutdown_rx) => {}
}
}));
}
}
let decoder_logs = match DecoderLoggers::from_config(&cfg.decode_logs) { let decoder_logs = match DecoderLoggers::from_config(&cfg.decode_logs) {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
+11
View File
@@ -76,6 +76,17 @@ port = 4739
# If omitted, it is derived from [general].latitude/[general].longitude. # If omitted, it is derived from [general].latitude/[general].longitude.
# receiver_locator = "JO93" # receiver_locator = "JO93"
[aprsfi]
# Enable APRS-IS IGate uplink (forwards received RF APRS packets to APRS-IS / aprs.fi)
enabled = false
# APRS-IS server (rotate.aprs.net does DNS round-robin across all tier-2 servers)
host = "rotate.aprs.net"
port = 14580
# APRS-IS passcode. -1 = auto-computed from [general].callsign.
# passcode = -1
[decode_logs] [decode_logs]
# Optional decoder message logs to files (APRS/CW/FT8/WSPR) # Optional decoder message logs to files (APRS/CW/FT8/WSPR)
enabled = false enabled = false