[feat](trx-frontend-http): add scheduler control handoff
Co-authored-by: OpenAI Codex <codex@openai.com> Signed-off-by: Stan Grams <sjg@haxx.space>
This commit is contained in:
@@ -261,7 +261,13 @@
|
|||||||
</div>
|
</div>
|
||||||
<div class="full-row label-below-row" id="vchan-row" style="display:none;">
|
<div class="full-row label-below-row" id="vchan-row" style="display:none;">
|
||||||
<div class="label"><span>Channels</span></div>
|
<div class="label"><span>Channels</span></div>
|
||||||
<div class="vchan-picker" id="vchan-picker"></div>
|
<div class="vchan-row-controls">
|
||||||
|
<div class="vchan-picker" id="vchan-picker"></div>
|
||||||
|
<div class="scheduler-release-wrap">
|
||||||
|
<button id="scheduler-release-btn" type="button">Release to Scheduler</button>
|
||||||
|
<div id="scheduler-release-status" class="scheduler-release-status">Scheduler is controlling the rig.</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
</div>
|
</div>
|
||||||
<div class="full-row label-below-row">
|
<div class="full-row label-below-row">
|
||||||
<div class="label"><span>Signal</span></div>
|
<div class="label"><span>Signal</span></div>
|
||||||
|
|||||||
@@ -308,6 +308,7 @@
|
|||||||
case "no_supported_decoders": return "Unsupported";
|
case "no_supported_decoders": return "Unsupported";
|
||||||
case "disabled": return "Disabled";
|
case "disabled": return "Disabled";
|
||||||
case "handled_by_scheduler": return "Scheduler";
|
case "handled_by_scheduler": return "Scheduler";
|
||||||
|
case "scheduler_has_control": return "Scheduler";
|
||||||
case "handled_by_virtual_channel": return "VChan";
|
case "handled_by_virtual_channel": return "VChan";
|
||||||
default: return "Inactive";
|
default: return "Inactive";
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ let vchanSessionId = null;
|
|||||||
let vchanRigId = null;
|
let vchanRigId = null;
|
||||||
let vchanChannels = [];
|
let vchanChannels = [];
|
||||||
let vchanActiveId = null;
|
let vchanActiveId = null;
|
||||||
|
let schedulerReleaseState = null;
|
||||||
|
let schedulerReleasePollTimer = null;
|
||||||
|
|
||||||
function vchanFmtFreq(hz) {
|
function vchanFmtFreq(hz) {
|
||||||
if (!Number.isFinite(hz) || hz <= 0) return "--";
|
if (!Number.isFinite(hz) || hz <= 0) return "--";
|
||||||
@@ -20,11 +22,101 @@ function vchanFmtFreq(hz) {
|
|||||||
return hz + "\u202fHz";
|
return hz + "\u202fHz";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function schedulerReleaseSummaryText(state) {
|
||||||
|
if (!state) return "Scheduler is controlling the rig.";
|
||||||
|
const connected = Number(state.connected_sessions) || 0;
|
||||||
|
const released = Number(state.released_sessions) || 0;
|
||||||
|
if (connected === 0) return "Scheduler can control the rig.";
|
||||||
|
if (state.all_released) {
|
||||||
|
return connected === 1
|
||||||
|
? "Scheduler is controlling the rig."
|
||||||
|
: `Scheduler is controlling the rig for all ${connected} users.`;
|
||||||
|
}
|
||||||
|
if (!state.current_session_released) {
|
||||||
|
const othersReleased = Math.max(released, 0);
|
||||||
|
return othersReleased > 0
|
||||||
|
? `You are holding control. ${othersReleased} other user${othersReleased === 1 ? "" : "s"} already released it.`
|
||||||
|
: "You are holding control. Release it to return control to the scheduler.";
|
||||||
|
}
|
||||||
|
const blocking = Math.max(connected - released, 0);
|
||||||
|
return blocking > 0
|
||||||
|
? `Scheduler is waiting for ${blocking} user${blocking === 1 ? "" : "s"} to stop manual tuning.`
|
||||||
|
: "Scheduler can control the rig.";
|
||||||
|
}
|
||||||
|
|
||||||
|
function vchanRenderSchedulerRelease() {
|
||||||
|
const btn = document.getElementById("scheduler-release-btn");
|
||||||
|
const status = document.getElementById("scheduler-release-status");
|
||||||
|
if (!btn || !status) return;
|
||||||
|
const currentReleased = !!(schedulerReleaseState && schedulerReleaseState.current_session_released);
|
||||||
|
btn.disabled = !vchanSessionId || currentReleased;
|
||||||
|
btn.classList.toggle("active", !currentReleased);
|
||||||
|
btn.textContent = "Release to Scheduler";
|
||||||
|
status.textContent = schedulerReleaseSummaryText(schedulerReleaseState);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function vchanPollSchedulerRelease() {
|
||||||
|
if (!vchanSessionId) {
|
||||||
|
schedulerReleaseState = null;
|
||||||
|
vchanRenderSchedulerRelease();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const resp = await fetch(`/scheduler-control?session_id=${encodeURIComponent(vchanSessionId)}`);
|
||||||
|
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
|
||||||
|
schedulerReleaseState = await resp.json();
|
||||||
|
vchanRenderSchedulerRelease();
|
||||||
|
} catch (e) {
|
||||||
|
console.error("scheduler release status failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function vchanStartSchedulerReleasePolling() {
|
||||||
|
if (schedulerReleasePollTimer) {
|
||||||
|
clearInterval(schedulerReleasePollTimer);
|
||||||
|
}
|
||||||
|
schedulerReleasePollTimer = setInterval(vchanPollSchedulerRelease, 10000);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function vchanToggleSchedulerRelease() {
|
||||||
|
if (!vchanSessionId) return;
|
||||||
|
try {
|
||||||
|
const resp = await fetch("/scheduler-control", {
|
||||||
|
method: "PUT",
|
||||||
|
headers: { "Content-Type": "application/json" },
|
||||||
|
body: JSON.stringify({ session_id: vchanSessionId, released: true }),
|
||||||
|
});
|
||||||
|
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
|
||||||
|
schedulerReleaseState = await resp.json();
|
||||||
|
vchanRenderSchedulerRelease();
|
||||||
|
} catch (e) {
|
||||||
|
console.error("scheduler release toggle failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function vchanTakeSchedulerControl() {
|
||||||
|
if (!vchanSessionId) return;
|
||||||
|
if (schedulerReleaseState && !schedulerReleaseState.current_session_released) return;
|
||||||
|
try {
|
||||||
|
const resp = await fetch("/scheduler-control", {
|
||||||
|
method: "PUT",
|
||||||
|
headers: { "Content-Type": "application/json" },
|
||||||
|
body: JSON.stringify({ session_id: vchanSessionId, released: false }),
|
||||||
|
});
|
||||||
|
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
|
||||||
|
schedulerReleaseState = await resp.json();
|
||||||
|
vchanRenderSchedulerRelease();
|
||||||
|
} catch (e) {
|
||||||
|
console.error("scheduler control takeover failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Called by app.js when the SSE `session` event arrives.
|
// Called by app.js when the SSE `session` event arrives.
|
||||||
function vchanHandleSession(data) {
|
function vchanHandleSession(data) {
|
||||||
try {
|
try {
|
||||||
const d = JSON.parse(data);
|
const d = JSON.parse(data);
|
||||||
vchanSessionId = d.session_id || null;
|
vchanSessionId = d.session_id || null;
|
||||||
|
vchanPollSchedulerRelease();
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.warn("vchan: bad session event", e);
|
console.warn("vchan: bad session event", e);
|
||||||
}
|
}
|
||||||
@@ -43,6 +135,7 @@ function vchanHandleChannels(data) {
|
|||||||
vchanReconnectAudio();
|
vchanReconnectAudio();
|
||||||
}
|
}
|
||||||
vchanRender();
|
vchanRender();
|
||||||
|
vchanRenderSchedulerRelease();
|
||||||
if (typeof renderRdsOverlays === "function") renderRdsOverlays();
|
if (typeof renderRdsOverlays === "function") renderRdsOverlays();
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.warn("vchan: bad channels event", e);
|
console.warn("vchan: bad channels event", e);
|
||||||
@@ -94,6 +187,7 @@ function vchanRender() {
|
|||||||
picker.appendChild(addBtn);
|
picker.appendChild(addBtn);
|
||||||
|
|
||||||
vchanSyncAccentUI();
|
vchanSyncAccentUI();
|
||||||
|
vchanRenderSchedulerRelease();
|
||||||
}
|
}
|
||||||
|
|
||||||
async function vchanAllocate() {
|
async function vchanAllocate() {
|
||||||
@@ -196,6 +290,7 @@ function vchanApplyCapabilities(caps) {
|
|||||||
const row = document.getElementById("vchan-row");
|
const row = document.getElementById("vchan-row");
|
||||||
if (!row) return;
|
if (!row) return;
|
||||||
row.style.display = (caps && caps.filter_controls) ? "" : "none";
|
row.style.display = (caps && caps.filter_controls) ? "" : "none";
|
||||||
|
vchanRenderSchedulerRelease();
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
@@ -391,10 +486,22 @@ window.vchanInterceptBandwidth = async function(bwHz) {
|
|||||||
await vchanSetChannelFreq(freqHz);
|
await vchanSetChannelFreq(freqHz);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
await vchanTakeSchedulerControl();
|
||||||
if (typeof _orig === "function") return _orig(freqHz);
|
if (typeof _orig === "function") return _orig(freqHz);
|
||||||
};
|
};
|
||||||
})();
|
})();
|
||||||
|
|
||||||
|
(function initSchedulerReleaseControl() {
|
||||||
|
const btn = document.getElementById("scheduler-release-btn");
|
||||||
|
if (btn) {
|
||||||
|
btn.addEventListener("click", () => {
|
||||||
|
vchanToggleSchedulerRelease();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
vchanStartSchedulerReleasePolling();
|
||||||
|
vchanRenderSchedulerRelease();
|
||||||
|
})();
|
||||||
|
|
||||||
// Wrap refreshFreqDisplay so the main freq field stays in sync with the
|
// Wrap refreshFreqDisplay so the main freq field stays in sync with the
|
||||||
// active virtual channel's frequency (SSE rig-state updates would otherwise
|
// active virtual channel's frequency (SSE rig-state updates would otherwise
|
||||||
// constantly overwrite it with channel 0's freq).
|
// constantly overwrite it with channel 0's freq).
|
||||||
|
|||||||
@@ -371,6 +371,28 @@ input.status-input, select.status-input { width: 100%; padding: 0.45rem 0.5rem;
|
|||||||
flex-wrap: wrap;
|
flex-wrap: wrap;
|
||||||
gap: 4px;
|
gap: 4px;
|
||||||
}
|
}
|
||||||
|
.vchan-row-controls {
|
||||||
|
display: flex;
|
||||||
|
align-items: flex-start;
|
||||||
|
justify-content: space-between;
|
||||||
|
gap: 0.8rem;
|
||||||
|
}
|
||||||
|
.scheduler-release-wrap {
|
||||||
|
display: flex;
|
||||||
|
flex-direction: column;
|
||||||
|
align-items: flex-end;
|
||||||
|
gap: 0.35rem;
|
||||||
|
min-width: 14rem;
|
||||||
|
}
|
||||||
|
.scheduler-release-wrap button.active {
|
||||||
|
border-color: var(--accent-yellow);
|
||||||
|
color: var(--accent-yellow);
|
||||||
|
}
|
||||||
|
.scheduler-release-status {
|
||||||
|
color: var(--text-muted);
|
||||||
|
font-size: 0.78rem;
|
||||||
|
text-align: right;
|
||||||
|
}
|
||||||
.vchan-picker button {
|
.vchan-picker button {
|
||||||
display: inline-flex;
|
display: inline-flex;
|
||||||
align-items: center;
|
align-items: center;
|
||||||
@@ -3582,6 +3604,7 @@ button:focus-visible, input:focus-visible, select:focus-visible {
|
|||||||
.bgd-status-state[data-state="out_of_span"],
|
.bgd-status-state[data-state="out_of_span"],
|
||||||
.bgd-status-state[data-state="waiting_for_spectrum"],
|
.bgd-status-state[data-state="waiting_for_spectrum"],
|
||||||
.bgd-status-state[data-state="waiting_for_user"],
|
.bgd-status-state[data-state="waiting_for_user"],
|
||||||
|
.bgd-status-state[data-state="scheduler_has_control"],
|
||||||
.bgd-status-state[data-state="inactive"],
|
.bgd-status-state[data-state="inactive"],
|
||||||
.bgd-status-state[data-state="handled_by_scheduler"],
|
.bgd-status-state[data-state="handled_by_scheduler"],
|
||||||
.bgd-status-state[data-state="handled_by_virtual_channel"] {
|
.bgd-status-state[data-state="handled_by_virtual_channel"] {
|
||||||
@@ -3592,6 +3615,17 @@ button:focus-visible, input:focus-visible, select:focus-visible {
|
|||||||
color: var(--accent-red);
|
color: var(--accent-red);
|
||||||
}
|
}
|
||||||
@media (max-width: 600px) {
|
@media (max-width: 600px) {
|
||||||
|
.vchan-row-controls {
|
||||||
|
flex-direction: column;
|
||||||
|
align-items: stretch;
|
||||||
|
}
|
||||||
|
.scheduler-release-wrap {
|
||||||
|
align-items: stretch;
|
||||||
|
min-width: 0;
|
||||||
|
}
|
||||||
|
.scheduler-release-status {
|
||||||
|
text-align: left;
|
||||||
|
}
|
||||||
.sch-row {
|
.sch-row {
|
||||||
flex-direction: column;
|
flex-direction: column;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -223,6 +223,7 @@ pub async fn events(
|
|||||||
clients: web::Data<Arc<AtomicUsize>>,
|
clients: web::Data<Arc<AtomicUsize>>,
|
||||||
context: web::Data<Arc<FrontendRuntimeContext>>,
|
context: web::Data<Arc<FrontendRuntimeContext>>,
|
||||||
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
|
vchan_mgr: web::Data<Arc<ClientChannelManager>>,
|
||||||
|
scheduler_control: web::Data<crate::server::scheduler::SharedSchedulerControlManager>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let rx = state.get_ref().clone();
|
let rx = state.get_ref().clone();
|
||||||
let initial = wait_for_view(rx.clone()).await?;
|
let initial = wait_for_view(rx.clone()).await?;
|
||||||
@@ -232,6 +233,7 @@ pub async fn events(
|
|||||||
|
|
||||||
// Assign a stable UUID to this SSE session for channel binding.
|
// Assign a stable UUID to this SSE session for channel binding.
|
||||||
let session_id = Uuid::new_v4();
|
let session_id = Uuid::new_v4();
|
||||||
|
scheduler_control.register_session(session_id);
|
||||||
|
|
||||||
// Seed the primary channel for the currently-selected rig (no-op if
|
// Seed the primary channel for the currently-selected rig (no-op if
|
||||||
// already initialised or if no rig is selected yet).
|
// already initialised or if no rig is selected yet).
|
||||||
@@ -336,11 +338,13 @@ pub async fn events(
|
|||||||
|
|
||||||
let vchan_drop = vchan_mgr.get_ref().clone();
|
let vchan_drop = vchan_mgr.get_ref().clone();
|
||||||
let counter_drop = counter.clone();
|
let counter_drop = counter.clone();
|
||||||
|
let scheduler_control_drop = scheduler_control.get_ref().clone();
|
||||||
let live = select(select(pings, updates), chan_updates);
|
let live = select(select(pings, updates), chan_updates);
|
||||||
let stream = prefix_stream.chain(live);
|
let stream = prefix_stream.chain(live);
|
||||||
let stream = DropStream::new(Box::pin(stream), move || {
|
let stream = DropStream::new(Box::pin(stream), move || {
|
||||||
counter_drop.fetch_sub(1, Ordering::Relaxed);
|
counter_drop.fetch_sub(1, Ordering::Relaxed);
|
||||||
vchan_drop.release_session(session_id);
|
vchan_drop.release_session(session_id);
|
||||||
|
scheduler_control_drop.unregister_session(session_id);
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(HttpResponse::Ok()
|
Ok(HttpResponse::Ok()
|
||||||
@@ -1318,6 +1322,8 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
|||||||
.service(crate::server::scheduler::put_scheduler)
|
.service(crate::server::scheduler::put_scheduler)
|
||||||
.service(crate::server::scheduler::delete_scheduler)
|
.service(crate::server::scheduler::delete_scheduler)
|
||||||
.service(crate::server::scheduler::get_scheduler_status)
|
.service(crate::server::scheduler::get_scheduler_status)
|
||||||
|
.service(crate::server::scheduler::get_scheduler_control)
|
||||||
|
.service(crate::server::scheduler::put_scheduler_control)
|
||||||
.service(crate::server::background_decode::get_background_decode)
|
.service(crate::server::background_decode::get_background_decode)
|
||||||
.service(crate::server::background_decode::put_background_decode)
|
.service(crate::server::background_decode::put_background_decode)
|
||||||
.service(crate::server::background_decode::delete_background_decode)
|
.service(crate::server::background_decode::delete_background_decode)
|
||||||
|
|||||||
@@ -437,6 +437,7 @@ impl RouteAccess {
|
|||||||
|| path.starts_with("/bookmarks?")
|
|| path.starts_with("/bookmarks?")
|
||||||
|| path.starts_with("/bookmarks/")
|
|| path.starts_with("/bookmarks/")
|
||||||
|| path.starts_with("/scheduler/")
|
|| path.starts_with("/scheduler/")
|
||||||
|
|| path.starts_with("/scheduler-control")
|
||||||
|| path.starts_with("/channels/")
|
|| path.starts_with("/channels/")
|
||||||
{
|
{
|
||||||
return Self::Read;
|
return Self::Read;
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use trx_frontend::{FrontendRuntimeContext, SharedSpectrum, VChanAudioCmd};
|
|||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::server::bookmarks::{Bookmark, BookmarkStore};
|
use crate::server::bookmarks::{Bookmark, BookmarkStore};
|
||||||
use crate::server::scheduler::SchedulerStatusMap;
|
use crate::server::scheduler::{SchedulerStatusMap, SharedSchedulerControlManager};
|
||||||
use crate::server::vchan::{ClientChannel, ClientChannelManager};
|
use crate::server::vchan::{ClientChannel, ClientChannelManager};
|
||||||
|
|
||||||
const SUPPORTED_DECODER_KINDS: &[&str] = &["aprs", "ais", "ft8", "wspr", "hf-aprs"];
|
const SUPPORTED_DECODER_KINDS: &[&str] = &["aprs", "ais", "ft8", "wspr", "hf-aprs"];
|
||||||
@@ -124,6 +124,7 @@ pub struct BackgroundDecodeManager {
|
|||||||
bookmarks: Arc<BookmarkStore>,
|
bookmarks: Arc<BookmarkStore>,
|
||||||
context: Arc<FrontendRuntimeContext>,
|
context: Arc<FrontendRuntimeContext>,
|
||||||
scheduler_status: SchedulerStatusMap,
|
scheduler_status: SchedulerStatusMap,
|
||||||
|
scheduler_control: SharedSchedulerControlManager,
|
||||||
vchan_mgr: Arc<ClientChannelManager>,
|
vchan_mgr: Arc<ClientChannelManager>,
|
||||||
status: Arc<RwLock<HashMap<String, BackgroundDecodeStatus>>>,
|
status: Arc<RwLock<HashMap<String, BackgroundDecodeStatus>>>,
|
||||||
notify_tx: broadcast::Sender<()>,
|
notify_tx: broadcast::Sender<()>,
|
||||||
@@ -135,6 +136,7 @@ impl BackgroundDecodeManager {
|
|||||||
bookmarks: Arc<BookmarkStore>,
|
bookmarks: Arc<BookmarkStore>,
|
||||||
context: Arc<FrontendRuntimeContext>,
|
context: Arc<FrontendRuntimeContext>,
|
||||||
scheduler_status: SchedulerStatusMap,
|
scheduler_status: SchedulerStatusMap,
|
||||||
|
scheduler_control: SharedSchedulerControlManager,
|
||||||
vchan_mgr: Arc<ClientChannelManager>,
|
vchan_mgr: Arc<ClientChannelManager>,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
let (notify_tx, _) = broadcast::channel(16);
|
let (notify_tx, _) = broadcast::channel(16);
|
||||||
@@ -143,6 +145,7 @@ impl BackgroundDecodeManager {
|
|||||||
bookmarks,
|
bookmarks,
|
||||||
context,
|
context,
|
||||||
scheduler_status,
|
scheduler_status,
|
||||||
|
scheduler_control,
|
||||||
vchan_mgr,
|
vchan_mgr,
|
||||||
status: Arc::new(RwLock::new(HashMap::new())),
|
status: Arc::new(RwLock::new(HashMap::new())),
|
||||||
notify_tx,
|
notify_tx,
|
||||||
@@ -314,10 +317,11 @@ impl BackgroundDecodeManager {
|
|||||||
let config = self.get_config(&rig_id);
|
let config = self.get_config(&rig_id);
|
||||||
let selected = dedup_ids(&config.bookmark_ids);
|
let selected = dedup_ids(&config.bookmark_ids);
|
||||||
let users_connected = self.context.sse_clients.load(Ordering::Relaxed) > 0;
|
let users_connected = self.context.sse_clients.load(Ordering::Relaxed) > 0;
|
||||||
let scheduled_bookmark_ids = if users_connected {
|
let scheduler_has_control = self.scheduler_control.scheduler_allowed() && users_connected;
|
||||||
Vec::new()
|
let scheduled_bookmark_ids = if scheduler_has_control || !users_connected {
|
||||||
} else {
|
|
||||||
self.scheduler_bookmark_ids(&rig_id)
|
self.scheduler_bookmark_ids(&rig_id)
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
};
|
};
|
||||||
let selected_bookmarks: HashMap<String, Bookmark> = self
|
let selected_bookmarks: HashMap<String, Bookmark> = self
|
||||||
.bookmarks
|
.bookmarks
|
||||||
@@ -373,6 +377,12 @@ impl BackgroundDecodeManager {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if scheduler_has_control {
|
||||||
|
status.state = "scheduler_has_control".to_string();
|
||||||
|
statuses.push(status);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if scheduled_bookmark_ids.iter().any(|id| id == &bookmark.id) {
|
if scheduled_bookmark_ids.iter().any(|id| id == &bookmark.id) {
|
||||||
status.state = "handled_by_scheduler".to_string();
|
status.state = "handled_by_scheduler".to_string();
|
||||||
statuses.push(status);
|
statuses.push(status);
|
||||||
|
|||||||
@@ -4,9 +4,10 @@
|
|||||||
|
|
||||||
//! Background Decoding Scheduler.
|
//! Background Decoding Scheduler.
|
||||||
//!
|
//!
|
||||||
//! When no SSE clients are connected the scheduler periodically inspects the
|
//! When no SSE clients are connected, or when every connected user explicitly
|
||||||
//! current UTC time, selects the matching bookmark from the per-rig config,
|
//! releases control, the scheduler periodically inspects the current UTC time,
|
||||||
//! and issues `SetFreq` + `SetMode` commands to retune the rig automatically.
|
//! selects the matching bookmark from the per-rig config, and issues rig
|
||||||
|
//! commands to retune and activate the scheduled decoder set automatically.
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
@@ -19,6 +20,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use trx_core::radio::freq::Freq;
|
use trx_core::radio::freq::Freq;
|
||||||
use trx_core::rig::command::RigCommand;
|
use trx_core::rig::command::RigCommand;
|
||||||
@@ -370,27 +372,94 @@ pub struct SchedulerStatus {
|
|||||||
/// Shared mutable state for scheduler status (one entry per rig).
|
/// Shared mutable state for scheduler status (one entry per rig).
|
||||||
pub type SchedulerStatusMap = Arc<RwLock<HashMap<String, SchedulerStatus>>>;
|
pub type SchedulerStatusMap = Arc<RwLock<HashMap<String, SchedulerStatus>>>;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Default)]
|
||||||
|
pub struct SchedulerControlSummary {
|
||||||
|
pub connected_sessions: usize,
|
||||||
|
pub released_sessions: usize,
|
||||||
|
pub all_released: bool,
|
||||||
|
pub current_session_released: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct SchedulerControlManager {
|
||||||
|
sessions: RwLock<HashMap<Uuid, bool>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SchedulerControlManager {
|
||||||
|
pub fn register_session(&self, session_id: Uuid) {
|
||||||
|
if let Ok(mut sessions) = self.sessions.write() {
|
||||||
|
sessions.insert(session_id, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unregister_session(&self, session_id: Uuid) {
|
||||||
|
if let Ok(mut sessions) = self.sessions.write() {
|
||||||
|
sessions.remove(&session_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_released(&self, session_id: Uuid, released: bool) -> SchedulerControlSummary {
|
||||||
|
if let Ok(mut sessions) = self.sessions.write() {
|
||||||
|
sessions.insert(session_id, released);
|
||||||
|
}
|
||||||
|
self.summary(Some(session_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn summary(&self, session_id: Option<Uuid>) -> SchedulerControlSummary {
|
||||||
|
let Ok(sessions) = self.sessions.read() else {
|
||||||
|
return SchedulerControlSummary::default();
|
||||||
|
};
|
||||||
|
let connected_sessions = sessions.len();
|
||||||
|
let released_sessions = sessions.values().filter(|released| **released).count();
|
||||||
|
let all_released = connected_sessions > 0 && released_sessions == connected_sessions;
|
||||||
|
let current_session_released = session_id
|
||||||
|
.and_then(|id| sessions.get(&id).copied())
|
||||||
|
.unwrap_or(false);
|
||||||
|
SchedulerControlSummary {
|
||||||
|
connected_sessions,
|
||||||
|
released_sessions,
|
||||||
|
all_released,
|
||||||
|
current_session_released,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn scheduler_allowed(&self) -> bool {
|
||||||
|
let summary = self.summary(None);
|
||||||
|
summary.connected_sessions == 0 || summary.all_released
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn has_active_user_control(&self) -> bool {
|
||||||
|
let summary = self.summary(None);
|
||||||
|
summary.connected_sessions > 0 && !summary.all_released
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type SharedSchedulerControlManager = Arc<SchedulerControlManager>;
|
||||||
|
|
||||||
pub fn spawn_scheduler_task(
|
pub fn spawn_scheduler_task(
|
||||||
context: Arc<FrontendRuntimeContext>,
|
_context: Arc<FrontendRuntimeContext>,
|
||||||
rig_tx: mpsc::Sender<RigRequest>,
|
rig_tx: mpsc::Sender<RigRequest>,
|
||||||
store: Arc<SchedulerStore>,
|
store: Arc<SchedulerStore>,
|
||||||
bookmarks: Arc<BookmarkStore>,
|
bookmarks: Arc<BookmarkStore>,
|
||||||
status_map: SchedulerStatusMap,
|
status_map: SchedulerStatusMap,
|
||||||
|
control: SharedSchedulerControlManager,
|
||||||
) {
|
) {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut interval = time::interval(Duration::from_secs(30));
|
let mut interval = time::interval(Duration::from_secs(30));
|
||||||
// Track last applied bookmark per rig to avoid redundant retunes.
|
// Track last applied bookmark per rig to avoid redundant retunes.
|
||||||
let mut last_applied: HashMap<String, String> = HashMap::new();
|
let mut last_applied: HashMap<String, String> = HashMap::new();
|
||||||
|
let mut last_control_allowed = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
|
|
||||||
// Skip if any user is currently connected.
|
// Skip while at least one connected user is still holding control.
|
||||||
if context
|
let scheduler_allowed = control.scheduler_allowed();
|
||||||
.sse_clients
|
if scheduler_allowed && !last_control_allowed {
|
||||||
.load(std::sync::atomic::Ordering::Relaxed)
|
last_applied.clear();
|
||||||
> 0
|
}
|
||||||
{
|
last_control_allowed = scheduler_allowed;
|
||||||
|
if !scheduler_allowed {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -434,6 +503,10 @@ pub fn spawn_scheduler_task(
|
|||||||
let extra_bm_ids: Vec<String> = active_entry
|
let extra_bm_ids: Vec<String> = active_entry
|
||||||
.map(|e| e.bookmark_ids.clone())
|
.map(|e| e.bookmark_ids.clone())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
let extra_bookmarks: Vec<_> = extra_bm_ids
|
||||||
|
.iter()
|
||||||
|
.filter_map(|id| bookmarks.get(id))
|
||||||
|
.collect();
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"scheduler: rig '{}' → bookmark '{}' ({} Hz {})",
|
"scheduler: rig '{}' → bookmark '{}' ({} Hz {})",
|
||||||
@@ -485,6 +558,14 @@ pub fn spawn_scheduler_task(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
apply_scheduler_decoders(
|
||||||
|
&rig_tx,
|
||||||
|
&config.rig_id,
|
||||||
|
&bm,
|
||||||
|
&extra_bookmarks,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
last_applied.insert(config.rig_id.clone(), bm_id.clone());
|
last_applied.insert(config.rig_id.clone(), bm_id.clone());
|
||||||
|
|
||||||
// Update status map (includes center_hz + extra bookmark_ids
|
// Update status map (includes center_hz + extra bookmark_ids
|
||||||
@@ -512,6 +593,48 @@ pub fn spawn_scheduler_task(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn apply_scheduler_decoders(
|
||||||
|
rig_tx: &mpsc::Sender<RigRequest>,
|
||||||
|
rig_id: &str,
|
||||||
|
bookmark: &crate::server::bookmarks::Bookmark,
|
||||||
|
extra_bookmarks: &[crate::server::bookmarks::Bookmark],
|
||||||
|
) {
|
||||||
|
let mut want_aprs = bookmark.mode.trim().eq_ignore_ascii_case("PKT");
|
||||||
|
let mut want_hf_aprs = false;
|
||||||
|
let mut want_ft8 = false;
|
||||||
|
let mut want_wspr = false;
|
||||||
|
|
||||||
|
let mut update_from = |bm: &crate::server::bookmarks::Bookmark| {
|
||||||
|
for decoder in bm.decoders.iter().map(|item| item.trim().to_ascii_lowercase()) {
|
||||||
|
match decoder.as_str() {
|
||||||
|
"aprs" => want_aprs = true,
|
||||||
|
"hf-aprs" => want_hf_aprs = true,
|
||||||
|
"ft8" => want_ft8 = true,
|
||||||
|
"wspr" => want_wspr = true,
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
update_from(bookmark);
|
||||||
|
for bm in extra_bookmarks {
|
||||||
|
update_from(bm);
|
||||||
|
}
|
||||||
|
|
||||||
|
let desired = [
|
||||||
|
("APRS", RigCommand::SetAprsDecodeEnabled(want_aprs)),
|
||||||
|
("HF APRS", RigCommand::SetHfAprsDecodeEnabled(want_hf_aprs)),
|
||||||
|
("FT8", RigCommand::SetFt8DecodeEnabled(want_ft8)),
|
||||||
|
("WSPR", RigCommand::SetWsprDecodeEnabled(want_wspr)),
|
||||||
|
];
|
||||||
|
|
||||||
|
for (label, cmd) in desired {
|
||||||
|
if let Err(e) = scheduler_send(rig_tx, cmd, rig_id.to_string()).await {
|
||||||
|
warn!("scheduler: Set{label}DecodeEnabled failed for '{}': {:?}", rig_id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Send a single RigCommand from the scheduler context (fire-and-forget style).
|
/// Send a single RigCommand from the scheduler context (fire-and-forget style).
|
||||||
async fn scheduler_send(
|
async fn scheduler_send(
|
||||||
rig_tx: &mpsc::Sender<RigRequest>,
|
rig_tx: &mpsc::Sender<RigRequest>,
|
||||||
@@ -592,3 +715,30 @@ pub async fn get_scheduler_status(
|
|||||||
let status = map.get(&rig_id).cloned().unwrap_or_default();
|
let status = map.get(&rig_id).cloned().unwrap_or_default();
|
||||||
HttpResponse::Ok().json(status)
|
HttpResponse::Ok().json(status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct SchedulerControlQuery {
|
||||||
|
pub session_id: Option<Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct SchedulerControlUpdate {
|
||||||
|
pub session_id: Uuid,
|
||||||
|
pub released: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[get("/scheduler-control")]
|
||||||
|
pub async fn get_scheduler_control(
|
||||||
|
query: web::Query<SchedulerControlQuery>,
|
||||||
|
control: web::Data<SharedSchedulerControlManager>,
|
||||||
|
) -> impl Responder {
|
||||||
|
HttpResponse::Ok().json(control.summary(query.session_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[put("/scheduler-control")]
|
||||||
|
pub async fn put_scheduler_control(
|
||||||
|
body: web::Json<SchedulerControlUpdate>,
|
||||||
|
control: web::Data<SharedSchedulerControlManager>,
|
||||||
|
) -> impl Responder {
|
||||||
|
HttpResponse::Ok().json(control.set_released(body.session_id, body.released))
|
||||||
|
}
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ use trx_frontend::{FrontendRuntimeContext, FrontendSpawner};
|
|||||||
|
|
||||||
use auth::{AuthConfig, AuthState, SameSite};
|
use auth::{AuthConfig, AuthState, SameSite};
|
||||||
use background_decode::{BackgroundDecodeManager, BackgroundDecodeStore};
|
use background_decode::{BackgroundDecodeManager, BackgroundDecodeStore};
|
||||||
use scheduler::{SchedulerStatusMap, SchedulerStore};
|
use scheduler::{SchedulerControlManager, SchedulerStatusMap, SchedulerStore};
|
||||||
use vchan::ClientChannelManager;
|
use vchan::ClientChannelManager;
|
||||||
|
|
||||||
/// HTTP frontend implementation.
|
/// HTTP frontend implementation.
|
||||||
@@ -76,6 +76,7 @@ async fn serve(
|
|||||||
let bookmark_path = bookmarks::BookmarkStore::default_path();
|
let bookmark_path = bookmarks::BookmarkStore::default_path();
|
||||||
let bookmark_store = Arc::new(bookmarks::BookmarkStore::open(&bookmark_path));
|
let bookmark_store = Arc::new(bookmarks::BookmarkStore::open(&bookmark_path));
|
||||||
let scheduler_status: SchedulerStatusMap = Arc::new(RwLock::new(HashMap::new()));
|
let scheduler_status: SchedulerStatusMap = Arc::new(RwLock::new(HashMap::new()));
|
||||||
|
let scheduler_control = Arc::new(SchedulerControlManager::default());
|
||||||
|
|
||||||
scheduler::spawn_scheduler_task(
|
scheduler::spawn_scheduler_task(
|
||||||
context.clone(),
|
context.clone(),
|
||||||
@@ -83,6 +84,7 @@ async fn serve(
|
|||||||
scheduler_store.clone(),
|
scheduler_store.clone(),
|
||||||
bookmark_store.clone(),
|
bookmark_store.clone(),
|
||||||
scheduler_status.clone(),
|
scheduler_status.clone(),
|
||||||
|
scheduler_control.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let background_decode_path = BackgroundDecodeStore::default_path();
|
let background_decode_path = BackgroundDecodeStore::default_path();
|
||||||
@@ -94,6 +96,7 @@ async fn serve(
|
|||||||
bookmark_store.clone(),
|
bookmark_store.clone(),
|
||||||
context.clone(),
|
context.clone(),
|
||||||
scheduler_status.clone(),
|
scheduler_status.clone(),
|
||||||
|
scheduler_control.clone(),
|
||||||
vchan_mgr.clone(),
|
vchan_mgr.clone(),
|
||||||
);
|
);
|
||||||
background_decode_mgr.spawn();
|
background_decode_mgr.spawn();
|
||||||
@@ -133,6 +136,7 @@ async fn serve(
|
|||||||
bookmark_store,
|
bookmark_store,
|
||||||
scheduler_store,
|
scheduler_store,
|
||||||
scheduler_status,
|
scheduler_status,
|
||||||
|
scheduler_control,
|
||||||
vchan_mgr,
|
vchan_mgr,
|
||||||
background_decode_mgr,
|
background_decode_mgr,
|
||||||
)?;
|
)?;
|
||||||
@@ -157,6 +161,7 @@ fn build_server(
|
|||||||
bookmark_store: Arc<bookmarks::BookmarkStore>,
|
bookmark_store: Arc<bookmarks::BookmarkStore>,
|
||||||
scheduler_store: Arc<SchedulerStore>,
|
scheduler_store: Arc<SchedulerStore>,
|
||||||
scheduler_status: SchedulerStatusMap,
|
scheduler_status: SchedulerStatusMap,
|
||||||
|
scheduler_control: Arc<SchedulerControlManager>,
|
||||||
vchan_mgr: Arc<ClientChannelManager>,
|
vchan_mgr: Arc<ClientChannelManager>,
|
||||||
background_decode_mgr: Arc<BackgroundDecodeManager>,
|
background_decode_mgr: Arc<BackgroundDecodeManager>,
|
||||||
) -> Result<Server, actix_web::Error> {
|
) -> Result<Server, actix_web::Error> {
|
||||||
@@ -170,6 +175,7 @@ fn build_server(
|
|||||||
|
|
||||||
let scheduler_store = web::Data::new(scheduler_store);
|
let scheduler_store = web::Data::new(scheduler_store);
|
||||||
let scheduler_status = web::Data::new(scheduler_status);
|
let scheduler_status = web::Data::new(scheduler_status);
|
||||||
|
let scheduler_control = web::Data::new(scheduler_control);
|
||||||
let vchan_mgr = web::Data::new(vchan_mgr);
|
let vchan_mgr = web::Data::new(vchan_mgr);
|
||||||
let background_decode_mgr = web::Data::new(background_decode_mgr);
|
let background_decode_mgr = web::Data::new(background_decode_mgr);
|
||||||
|
|
||||||
@@ -214,6 +220,7 @@ fn build_server(
|
|||||||
.app_data(bookmark_store.clone())
|
.app_data(bookmark_store.clone())
|
||||||
.app_data(scheduler_store.clone())
|
.app_data(scheduler_store.clone())
|
||||||
.app_data(scheduler_status.clone())
|
.app_data(scheduler_status.clone())
|
||||||
|
.app_data(scheduler_control.clone())
|
||||||
.app_data(vchan_mgr.clone())
|
.app_data(vchan_mgr.clone())
|
||||||
.app_data(background_decode_mgr.clone())
|
.app_data(background_decode_mgr.clone())
|
||||||
.wrap(Compress::default())
|
.wrap(Compress::default())
|
||||||
|
|||||||
Reference in New Issue
Block a user