You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
306 lines
9.8 KiB
306 lines
9.8 KiB
|
2 days ago
|
//! Axum HTTP/WS server — admin UI is built-in, modules are Lua-side.
|
||
|
|
|
||
|
|
use axum::{
|
||
|
|
body::Body,
|
||
|
|
extract::{Path, ws::{Message, WebSocket, WebSocketUpgrade}},
|
||
|
|
http::{header, StatusCode},
|
||
|
|
response::{Html, IntoResponse, Response},
|
||
|
|
routing::{get, post},
|
||
|
|
Router,
|
||
|
|
};
|
||
|
|
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
||
|
|
use std::sync::{Mutex, OnceLock};
|
||
|
|
use std::collections::HashMap;
|
||
|
|
|
||
|
|
use crate::{bridge, logging};
|
||
|
|
|
||
|
|
const BUILD_TS: &str = match option_env!("ARZ_BUILD_TS") {
|
||
|
|
Some(v) => v,
|
||
|
|
None => "dev",
|
||
|
|
};
|
||
|
|
|
||
|
|
static SHUTDOWN: AtomicBool = AtomicBool::new(false);
|
||
|
|
static MODULE_DIRS: OnceLock<Mutex<HashMap<String, String>>> = OnceLock::new();
|
||
|
|
static RT_HANDLE: OnceLock<tokio::runtime::Handle> = OnceLock::new();
|
||
|
|
|
||
|
|
pub fn runtime_handle() -> Option<&'static tokio::runtime::Handle> {
|
||
|
|
RT_HANDLE.get()
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Registered commands: name → owner module
|
||
|
|
static COMMANDS: OnceLock<Mutex<Vec<(String, String)>>> = OnceLock::new();
|
||
|
|
|
||
|
|
fn commands() -> &'static Mutex<Vec<(String, String)>> {
|
||
|
|
COMMANDS.get_or_init(|| Mutex::new(Vec::new()))
|
||
|
|
}
|
||
|
|
|
||
|
|
fn module_dirs() -> &'static Mutex<HashMap<String, String>> {
|
||
|
|
MODULE_DIRS.get_or_init(|| Mutex::new(HashMap::new()))
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn register_module(name: &str, static_dir: &str) {
|
||
|
|
if static_dir.is_empty() {
|
||
|
|
module_dirs().lock().unwrap().remove(name);
|
||
|
|
} else {
|
||
|
|
module_dirs().lock().unwrap().insert(name.to_string(), static_dir.to_string());
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn unregister_module(name: &str) {
|
||
|
|
module_dirs().lock().unwrap().remove(name);
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn list_modules() -> Vec<String> {
|
||
|
|
module_dirs().lock().unwrap().keys().cloned().collect()
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Check if a module has a static/index.html registered
|
||
|
|
pub fn module_has_static(name: &str) -> bool {
|
||
|
|
let dirs = module_dirs().lock().unwrap();
|
||
|
|
dirs.get(name).map(|d| !d.is_empty()).unwrap_or(false)
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn register_command(name: &str, owner: &str) {
|
||
|
|
let mut cmds = commands().lock().unwrap();
|
||
|
|
// Avoid duplicates
|
||
|
|
if !cmds.iter().any(|(n, _)| n == name) {
|
||
|
|
cmds.push((name.to_string(), owner.to_string()));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn get_commands_json() -> String {
|
||
|
|
let cmds = commands().lock().unwrap();
|
||
|
|
let items: Vec<String> = cmds.iter()
|
||
|
|
.map(|(n, o)| format!(r#"{{"name":"{}","owner":"{}"}}"#, n, o))
|
||
|
|
.collect();
|
||
|
|
format!("[{}]", items.join(","))
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn start(port: u16) -> Result<(), String> {
|
||
|
|
let _initial_rx = bridge::init_event_channel();
|
||
|
|
|
||
|
|
let (rt_tx, rt_rx) = std::sync::mpsc::sync_channel(1);
|
||
|
|
|
||
|
|
std::thread::Builder::new()
|
||
|
|
.name("rgl-server".into())
|
||
|
|
.spawn(move || {
|
||
|
|
let rt = match tokio::runtime::Builder::new_multi_thread()
|
||
|
|
.worker_threads(2)
|
||
|
|
.enable_all()
|
||
|
|
.build()
|
||
|
|
{
|
||
|
|
Ok(rt) => rt,
|
||
|
|
Err(_) => { rt_tx.send(false).ok(); return; },
|
||
|
|
};
|
||
|
|
|
||
|
|
RT_HANDLE.set(rt.handle().clone()).ok();
|
||
|
|
rt_tx.send(true).ok(); // Signal that runtime is ready
|
||
|
|
|
||
|
|
rt.block_on(async move {
|
||
|
|
let app = Router::new()
|
||
|
|
.route("/", get(index_handler))
|
||
|
|
.route("/admin", get(admin_handler))
|
||
|
|
.route("/ws", get(ws_handler))
|
||
|
|
.route("/api/modules", get(modules_list_handler))
|
||
|
|
.route("/api/commands", get(commands_list_handler))
|
||
|
|
.route("/api/{module}/{action}", post(api_handler))
|
||
|
|
.fallback(static_file_handler);
|
||
|
|
|
||
|
|
let addr = format!("0.0.0.0:{port}");
|
||
|
|
let socket = match tokio::net::TcpSocket::new_v4() {
|
||
|
|
Ok(s) => s,
|
||
|
|
Err(_) => return,
|
||
|
|
};
|
||
|
|
socket.set_reuseaddr(true).ok();
|
||
|
|
let bind_addr: std::net::SocketAddr = addr.parse().unwrap();
|
||
|
|
if socket.bind(bind_addr).is_err() { return; }
|
||
|
|
if let Ok(listener) = socket.listen(128) {
|
||
|
|
SHUTDOWN.store(false, AtomicOrdering::Relaxed);
|
||
|
|
let graceful = axum::serve(listener, app)
|
||
|
|
.with_graceful_shutdown(async {
|
||
|
|
while !SHUTDOWN.load(AtomicOrdering::Relaxed) {
|
||
|
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||
|
|
}
|
||
|
|
});
|
||
|
|
graceful.await.ok();
|
||
|
|
}
|
||
|
|
});
|
||
|
|
})
|
||
|
|
.map_err(|e| format!("thread spawn error: {e}"))?;
|
||
|
|
|
||
|
|
// Wait for tokio runtime to be ready before returning
|
||
|
|
match rt_rx.recv() {
|
||
|
|
Ok(true) => Ok(()),
|
||
|
|
_ => Err("runtime init failed".to_string()),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn stop() {
|
||
|
|
SHUTDOWN.store(true, AtomicOrdering::Relaxed);
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- Built-in pages ---
|
||
|
|
|
||
|
|
async fn index_handler() -> Html<String> {
|
||
|
|
let html = include_str!("../static/index.html")
|
||
|
|
.replace("{{BUILD_TS}}", BUILD_TS);
|
||
|
|
Html(html)
|
||
|
|
}
|
||
|
|
|
||
|
|
async fn admin_handler() -> Html<String> {
|
||
|
|
let html = include_str!("../static/ui_page.html")
|
||
|
|
.replace("{{MODULE}}", "admin")
|
||
|
|
.replace("{{TITLE}}", "Admin");
|
||
|
|
Html(html)
|
||
|
|
}
|
||
|
|
|
||
|
|
async fn commands_list_handler() -> impl IntoResponse {
|
||
|
|
let json = get_commands_json();
|
||
|
|
Response::builder()
|
||
|
|
.header(header::CONTENT_TYPE, "application/json")
|
||
|
|
.body(Body::from(json))
|
||
|
|
.unwrap()
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- API ---
|
||
|
|
|
||
|
|
async fn modules_list_handler() -> impl IntoResponse {
|
||
|
|
axum::Json(serde_json::json!({
|
||
|
|
"modules": list_modules(),
|
||
|
|
"build": BUILD_TS,
|
||
|
|
}))
|
||
|
|
}
|
||
|
|
|
||
|
|
async fn api_handler(
|
||
|
|
Path((module, action)): Path<(String, String)>,
|
||
|
|
body: String,
|
||
|
|
) -> impl IntoResponse {
|
||
|
|
let code = format!(
|
||
|
|
"return __arz_handle_api([=[{module}]=], [=[{action}]=], [=[{body}]=])"
|
||
|
|
);
|
||
|
|
let id = bridge::request_lua_exec(code);
|
||
|
|
|
||
|
|
// Poll for result with async timeout — never blocks tokio workers
|
||
|
|
let result = tokio::time::timeout(
|
||
|
|
std::time::Duration::from_secs(2),
|
||
|
|
async {
|
||
|
|
loop {
|
||
|
|
if let Some(r) = bridge::try_get_result(id) {
|
||
|
|
return r;
|
||
|
|
}
|
||
|
|
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
).await;
|
||
|
|
|
||
|
|
match result {
|
||
|
|
Ok(r) => Response::builder()
|
||
|
|
.header(header::CONTENT_TYPE, "application/json")
|
||
|
|
.body(Body::from(r))
|
||
|
|
.unwrap(),
|
||
|
|
Err(_) => Response::builder()
|
||
|
|
.status(StatusCode::GATEWAY_TIMEOUT)
|
||
|
|
.body(Body::from(r#"{"error":"lua timeout"}"#))
|
||
|
|
.unwrap(),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- Static files ---
|
||
|
|
|
||
|
|
async fn static_file_handler(uri: axum::http::Uri) -> Response {
|
||
|
|
let path = uri.path();
|
||
|
|
let Some(rest) = path.strip_prefix("/m/") else {
|
||
|
|
return (StatusCode::NOT_FOUND, "not found").into_response();
|
||
|
|
};
|
||
|
|
|
||
|
|
let (module, file_path) = match rest.find('/') {
|
||
|
|
Some(i) => (&rest[..i], &rest[i+1..]),
|
||
|
|
None => (rest, ""),
|
||
|
|
};
|
||
|
|
|
||
|
|
let base_dir = {
|
||
|
|
let dirs = module_dirs().lock().unwrap();
|
||
|
|
match dirs.get(module) {
|
||
|
|
Some(d) if d == "__render__" => {
|
||
|
|
// Module uses render() API — serve auto-UI page
|
||
|
|
let html = include_str!("../static/ui_page.html")
|
||
|
|
.replace("{{MODULE}}", module)
|
||
|
|
.replace("{{TITLE}}", module);
|
||
|
|
return ([(header::CONTENT_TYPE, "text/html; charset=utf-8")], html).into_response();
|
||
|
|
}
|
||
|
|
Some(d) if !d.is_empty() => d.clone(),
|
||
|
|
_ => {
|
||
|
|
return (StatusCode::NOT_FOUND, "module not found").into_response();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
};
|
||
|
|
|
||
|
|
let full_path = if file_path.is_empty() || file_path == "/" {
|
||
|
|
format!("{}/index.html", base_dir)
|
||
|
|
} else {
|
||
|
|
let clean = file_path.trim_start_matches('/');
|
||
|
|
if clean.contains("..") {
|
||
|
|
return (StatusCode::FORBIDDEN, "forbidden").into_response();
|
||
|
|
}
|
||
|
|
format!("{}/{}", base_dir, clean)
|
||
|
|
};
|
||
|
|
|
||
|
|
match tokio::fs::read(&full_path).await {
|
||
|
|
Ok(contents) => {
|
||
|
|
let ct = match full_path.rsplit('.').next() {
|
||
|
|
Some("html") => "text/html; charset=utf-8",
|
||
|
|
Some("css") => "text/css",
|
||
|
|
Some("js") => "application/javascript",
|
||
|
|
Some("json") => "application/json",
|
||
|
|
Some("png") => "image/png",
|
||
|
|
Some("svg") => "image/svg+xml",
|
||
|
|
_ => "application/octet-stream",
|
||
|
|
};
|
||
|
|
([(header::CONTENT_TYPE, ct)], contents).into_response()
|
||
|
|
}
|
||
|
|
Err(_) => (StatusCode::NOT_FOUND, "not found").into_response(),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- WebSocket ---
|
||
|
|
|
||
|
|
async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
|
||
|
|
ws.on_upgrade(handle_ws)
|
||
|
|
}
|
||
|
|
|
||
|
|
async fn handle_ws(mut socket: WebSocket) {
|
||
|
|
let mut event_rx = match bridge::subscribe_events() {
|
||
|
|
Some(rx) => rx,
|
||
|
|
None => return,
|
||
|
|
};
|
||
|
|
|
||
|
|
loop {
|
||
|
|
tokio::select! {
|
||
|
|
Ok(event) = event_rx.recv() => {
|
||
|
|
let json = serde_json::json!({
|
||
|
|
"type": "event",
|
||
|
|
"event": event.event,
|
||
|
|
"args": serde_json::from_str::<serde_json::Value>(&event.args)
|
||
|
|
.unwrap_or(serde_json::Value::Null),
|
||
|
|
});
|
||
|
|
if socket.send(Message::Text(json.to_string().into())).await.is_err() {
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
msg = socket.recv() => {
|
||
|
|
match msg {
|
||
|
|
Some(Ok(Message::Text(text))) => {
|
||
|
|
if text.as_str() == "ping" {
|
||
|
|
if socket.send(Message::Text("pong".into())).await.is_err() {
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
Some(Ok(Message::Close(_))) | None => break,
|
||
|
|
_ => {}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|