# web/server.py import asyncio from typing import List, Any, Dict from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse from vs_types import ( success, fail, BizException, ApiResponse, GroupControl, UpgradePluginRequest, UpdateConfigRequest, PluginStatusOut, GroupConfig, PluginRestarted ) from core.app_manager import AppManager from vs_log_macros import VSC_INFO, VSC_ERROR app = FastAPI(title="Visa Plugin Manager") # ----------------------- # Exception Handlers # ----------------------- @app.exception_handler(BizException) async def biz_exception_handler(request: Request, exc: BizException): return JSONResponse( status_code=exc.http_status, content={ "code": exc.code, "message": exc.message, "data": exc.extra, }, ) @app.exception_handler(Exception) async def unhandled_exception_handler(request: Request, exc: Exception): return JSONResponse( status_code=500, content={ "code": 50000, "message": "Internal Server Error", "data": None, }, ) @app.get("/status", response_model=ApiResponse[List[PluginStatusOut]]) def get_status(): plugin_status = AppManager.Instance().get_status() return success(data=plugin_status) @app.post("/start", response_model=ApiResponse) def start_group(payload: GroupControl): AppManager.Instance().start_group(payload.group_id) return success(message=f"Group {payload.group_id} started") @app.post("/stop", response_model=ApiResponse) def stop_group(payload: GroupControl): AppManager.Instance().stop_group(payload.group_id) return success(message=f"Group {payload.group_id} stopped") @app.post("/restart", response_model=ApiResponse) def restart_group(payload: GroupControl): AppManager.Instance().restart_group(payload.group_id) return success(message=f"Group {payload.group_id} restarted") @app.post("/group_config", response_model=ApiResponse[GroupConfig]) def get_group_config(payload: GroupControl): config = AppManager.Instance().get_group_config(payload.group_id) return success(data=config) @app.post("/ota/upgrade_plugin", response_model=ApiResponse[PluginRestarted]) def ota_upgrade(payload: UpgradePluginRequest): restarted = AppManager.Instance().ota_upgrade_plugin(payload.plugin_name) return success(message=f"Plugin {payload.plugin_name} reloaded", data=restarted) @app.post("/ota/update_config") def ota_update(payload: UpdateConfigRequest): AppManager.Instance().ota_update_plugin_config(payload.group_id, payload.new_config_str) return success(message=f"Plugin {payload.group_id} config updated") @app.websocket("/ws/logs/{group_id}") async def websocket_logs_batch(ws: WebSocket, group_id: str): await ws.accept() queue = asyncio.Queue(maxsize=5000) loop = asyncio.get_running_loop() def _put_msg_in_loop(msg: str): try: queue.put_nowait(msg) except asyncio.QueueFull: pass def log_callback(msg: str): loop.call_soon_threadsafe(_put_msg_in_loop, msg) AppManager.Instance().subscribe_executor_logs(group_id, log_callback) try: buffer = [] while True: # 阻塞等待第一条消息 msg = await queue.get() buffer.append(msg) # 尝试非阻塞获取队列中剩余的消息(最多取 50 条,避免包太大) # 这样可以将瞬间涌入的日志合并成一个包发送 for _ in range(50): if queue.empty(): break try: buffer.append(queue.get_nowait()) except asyncio.QueueEmpty: break # 合并发送 (根据前端需求,可以用换行符拼接,或者发送 JSON 数组) if buffer: await ws.send_text("\n".join(buffer)) buffer.clear() except WebSocketDisconnect: # 这是 FastAPI/Starlette 封装的标准异常 # 当客户端断开连接时会抛出这个 print(f"Client {group_id} disconnected") except Exception as e: # 处理其他未知的系统错误 print(f"Error: {e}") finally: AppManager.Instance().unsubscribe_executor_logs(group_id, log_callback) def run_web_server(host="0.0.0.0", port=8000): import uvicorn # log_level warning 减少控制台刷屏 uvicorn.run(app, host=host, port=port, log_level="info")