| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- # web/server.py
- import asyncio
- from typing import List, Any, Dict
- from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
- from fastapi.responses import JSONResponse
- from vs_types import (
- success,
- fail,
- BizException,
- ApiResponse,
- GroupControl,
- UpgradePluginRequest,
- UpdateConfigRequest,
- PluginStatusOut,
- GroupConfig,
- PluginRestarted
- )
- from core.app_manager import AppManager
- from vs_log_macros import VSC_INFO, VSC_ERROR
- app = FastAPI(title="Visa Plugin Manager")
- # -----------------------
- # Exception Handlers
- # -----------------------
- @app.exception_handler(BizException)
- async def biz_exception_handler(request: Request, exc: BizException):
- return JSONResponse(
- status_code=exc.http_status,
- content={
- "code": exc.code,
- "message": exc.message,
- "data": exc.extra,
- },
- )
- @app.exception_handler(Exception)
- async def unhandled_exception_handler(request: Request, exc: Exception):
- return JSONResponse(
- status_code=500,
- content={
- "code": 50000,
- "message": "Internal Server Error",
- "data": None,
- },
- )
- @app.get("/status", response_model=ApiResponse[List[PluginStatusOut]])
- def get_status():
- plugin_status = AppManager.Instance().get_status()
- return success(data=plugin_status)
- @app.post("/start", response_model=ApiResponse)
- def start_group(payload: GroupControl):
- AppManager.Instance().start_group(payload.group_id)
- return success(message=f"Group {payload.group_id} started")
- @app.post("/stop", response_model=ApiResponse)
- def stop_group(payload: GroupControl):
- AppManager.Instance().stop_group(payload.group_id)
- return success(message=f"Group {payload.group_id} stopped")
- @app.post("/restart", response_model=ApiResponse)
- def restart_group(payload: GroupControl):
- AppManager.Instance().restart_group(payload.group_id)
- return success(message=f"Group {payload.group_id} restarted")
- @app.post("/group_config", response_model=ApiResponse[GroupConfig])
- def get_group_config(payload: GroupControl):
- config = AppManager.Instance().get_group_config(payload.group_id)
- return success(data=config)
- @app.post("/ota/upgrade_plugin", response_model=ApiResponse[PluginRestarted])
- def ota_upgrade(payload: UpgradePluginRequest):
- restarted = AppManager.Instance().ota_upgrade_plugin(payload.plugin_name)
- return success(message=f"Plugin {payload.plugin_name} reloaded", data=restarted)
-
- @app.post("/ota/update_config")
- def ota_update(payload: UpdateConfigRequest):
- AppManager.Instance().ota_update_plugin_config(payload.group_id, payload.new_config_str)
- return success(message=f"Plugin {payload.group_id} config updated")
-
- @app.websocket("/ws/logs/{group_id}")
- async def websocket_logs_batch(ws: WebSocket, group_id: str):
- await ws.accept()
- queue = asyncio.Queue(maxsize=5000)
- loop = asyncio.get_running_loop()
- def _put_msg_in_loop(msg: str):
- try:
- queue.put_nowait(msg)
- except asyncio.QueueFull:
- pass
- def log_callback(msg: str):
- loop.call_soon_threadsafe(_put_msg_in_loop, msg)
- AppManager.Instance().subscribe_executor_logs(group_id, log_callback)
- try:
- buffer = []
- while True:
- # 阻塞等待第一条消息
- msg = await queue.get()
- buffer.append(msg)
- # 尝试非阻塞获取队列中剩余的消息(最多取 50 条,避免包太大)
- # 这样可以将瞬间涌入的日志合并成一个包发送
- for _ in range(50):
- if queue.empty():
- break
- try:
- buffer.append(queue.get_nowait())
- except asyncio.QueueEmpty:
- break
-
- # 合并发送 (根据前端需求,可以用换行符拼接,或者发送 JSON 数组)
- if buffer:
- await ws.send_text("\n".join(buffer))
- buffer.clear()
- except WebSocketDisconnect:
- # 这是 FastAPI/Starlette 封装的标准异常
- # 当客户端断开连接时会抛出这个
- print(f"Client {group_id} disconnected")
- except Exception as e:
- # 处理其他未知的系统错误
- print(f"Error: {e}")
- finally:
- AppManager.Instance().unsubscribe_executor_logs(group_id, log_callback)
- def run_web_server(host="0.0.0.0", port=8000):
- import uvicorn
- # log_level warning 减少控制台刷屏
- uvicorn.run(app, host=host, port=port, log_level="info")
|