server.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. # web/server.py
  2. import asyncio
  3. from typing import List, Any, Dict
  4. from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
  5. from fastapi.responses import JSONResponse
  6. from vs_types import (
  7. success,
  8. fail,
  9. BizException,
  10. ApiResponse,
  11. GroupControl,
  12. UpgradePluginRequest,
  13. UpdateConfigRequest,
  14. PluginStatusOut,
  15. GroupConfig,
  16. PluginRestarted
  17. )
  18. from core.app_manager import AppManager
  19. from vs_log_macros import VSC_INFO, VSC_ERROR
  20. app = FastAPI(title="Visa Plugin Manager")
  21. # -----------------------
  22. # Exception Handlers
  23. # -----------------------
  24. @app.exception_handler(BizException)
  25. async def biz_exception_handler(request: Request, exc: BizException):
  26. return JSONResponse(
  27. status_code=exc.http_status,
  28. content={
  29. "code": exc.code,
  30. "message": exc.message,
  31. "data": exc.extra,
  32. },
  33. )
  34. @app.exception_handler(Exception)
  35. async def unhandled_exception_handler(request: Request, exc: Exception):
  36. return JSONResponse(
  37. status_code=500,
  38. content={
  39. "code": 50000,
  40. "message": "Internal Server Error",
  41. "data": None,
  42. },
  43. )
  44. @app.get("/status", response_model=ApiResponse[List[PluginStatusOut]])
  45. def get_status():
  46. plugin_status = AppManager.Instance().get_status()
  47. return success(data=plugin_status)
  48. @app.post("/start", response_model=ApiResponse)
  49. def start_group(payload: GroupControl):
  50. AppManager.Instance().start_group(payload.group_id)
  51. return success(message=f"Group {payload.group_id} started")
  52. @app.post("/stop", response_model=ApiResponse)
  53. def stop_group(payload: GroupControl):
  54. AppManager.Instance().stop_group(payload.group_id)
  55. return success(message=f"Group {payload.group_id} stopped")
  56. @app.post("/restart", response_model=ApiResponse)
  57. def restart_group(payload: GroupControl):
  58. AppManager.Instance().restart_group(payload.group_id)
  59. return success(message=f"Group {payload.group_id} restarted")
  60. @app.post("/group_config", response_model=ApiResponse[GroupConfig])
  61. def get_group_config(payload: GroupControl):
  62. config = AppManager.Instance().get_group_config(payload.group_id)
  63. return success(data=config)
  64. @app.post("/ota/upgrade_plugin", response_model=ApiResponse[PluginRestarted])
  65. def ota_upgrade(payload: UpgradePluginRequest):
  66. restarted = AppManager.Instance().ota_upgrade_plugin(payload.plugin_name)
  67. return success(message=f"Plugin {payload.plugin_name} reloaded", data=restarted)
  68. @app.post("/ota/update_config")
  69. def ota_update(payload: UpdateConfigRequest):
  70. AppManager.Instance().ota_update_plugin_config(payload.group_id, payload.new_config_str)
  71. return success(message=f"Plugin {payload.group_id} config updated")
  72. @app.websocket("/ws/logs/{group_id}")
  73. async def websocket_logs_batch(ws: WebSocket, group_id: str):
  74. await ws.accept()
  75. queue = asyncio.Queue(maxsize=5000)
  76. loop = asyncio.get_running_loop()
  77. def _put_msg_in_loop(msg: str):
  78. try:
  79. queue.put_nowait(msg)
  80. except asyncio.QueueFull:
  81. pass
  82. def log_callback(msg: str):
  83. loop.call_soon_threadsafe(_put_msg_in_loop, msg)
  84. AppManager.Instance().subscribe_executor_logs(group_id, log_callback)
  85. try:
  86. buffer = []
  87. while True:
  88. # 阻塞等待第一条消息
  89. msg = await queue.get()
  90. buffer.append(msg)
  91. # 尝试非阻塞获取队列中剩余的消息(最多取 50 条,避免包太大)
  92. # 这样可以将瞬间涌入的日志合并成一个包发送
  93. for _ in range(50):
  94. if queue.empty():
  95. break
  96. try:
  97. buffer.append(queue.get_nowait())
  98. except asyncio.QueueEmpty:
  99. break
  100. # 合并发送 (根据前端需求,可以用换行符拼接,或者发送 JSON 数组)
  101. if buffer:
  102. await ws.send_text("\n".join(buffer))
  103. buffer.clear()
  104. except WebSocketDisconnect:
  105. # 这是 FastAPI/Starlette 封装的标准异常
  106. # 当客户端断开连接时会抛出这个
  107. print(f"Client {group_id} disconnected")
  108. except Exception as e:
  109. # 处理其他未知的系统错误
  110. print(f"Error: {e}")
  111. finally:
  112. AppManager.Instance().unsubscribe_executor_logs(group_id, log_callback)
  113. def run_web_server(host="0.0.0.0", port=8000):
  114. import uvicorn
  115. # log_level warning 减少控制台刷屏
  116. uvicorn.run(app, host=host, port=port, log_level="info")