jerry пре 3 месеци
родитељ
комит
80a3016a67

+ 2 - 11
app/api/router.py

@@ -80,7 +80,6 @@ from app.services.telegram_service import TelegramService
 from app.services.wechat_service import WechatService
 from app.services.slot_snapshot_service import SlotSnapshotService
 from app.services.statistics_service import StatisticsService
-from app.services.queue_service import QueueService
 from app.services.llm_service import LlmService
 
 
@@ -618,16 +617,8 @@ async def task_pop_task(
     queue_name: str,
     db: AsyncSession = Depends(get_db),
 ):
-    task = await QueueService.pop_task(db, queue_name)
+    task = await TaskService.pop_task(db, queue_name)
     return success(data=task)
-    
-@admin_required_router.get("/queues", summary="调试队列数据", tags=["测试接口"], response_model=ApiResponse)
-async def dump_all_queues():
-    """
-    调试:查看所有内存队列
-    """
-    data = await QueueService.dump_all()
-    return success(data=data)
 
 @admin_required_router.post("/tg/send_message", summary="推送电报消息", tags=["消息推送接口"], response_model=ApiResponse)
 async def tg_send_message(
@@ -1219,7 +1210,7 @@ async def vas_task_pop_task(
     queue_name: str,
     db: AsyncSession = Depends(get_db),
 ):
-    task = await QueueService.pop_vas_task(db, queue_name)
+    task = await VasTaskService.pop_vas_task(db, queue_name, 180)
     return success(data=task)
 
 @protected_router.post("/vas/ticket/create", summary="创建工单", tags=["Visafly签证系统"], response_model=ApiResponse[VasTicketOut])

+ 0 - 5
app/core/queue_manager.py

@@ -1,5 +0,0 @@
-# app/core/queue_manager.py
-
-from app.utils.priority_queue_utils import NamedQueueManager
-
-queue_manager = NamedQueueManager()

+ 1 - 1
app/services/docker_remote_service.py

@@ -1,7 +1,7 @@
 # app/services/docker_remote_service.py
 import json
 from typing import Optional, Dict, Any, List
-from app.services.docker_remote_control import DockerRemoteController
+from app.utils.docker_remote_control import DockerRemoteController
 from app.schemas.docker_remote import (
     RemoteServerConfig,
     DockerContainerStatus,

+ 0 - 110
app/services/queue_service.py

@@ -1,110 +0,0 @@
-from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy import select
-from app.core.logger import logger
-from app.core.biz_exception import NotFoundError, BizLogicError
-from app.core.queue_manager import queue_manager
-from app.models.task import Task
-from app.models.vas_task import VasTask
-
-class QueueService:
-    
-    async def rebuild_task_queue(db: AsyncSession, queue_name: str):
-        if queue_manager.is_initialized(queue_name):
-            return
-        
-        queue = queue_manager.get_queue(queue_name)
-        queue.clear()
-        
-        stmt = (
-            select(Task)
-            .where(
-                Task.command == queue_name,
-                Task.status == 0
-            )
-        )
-        tasks = (await db.execute(stmt)).scalars().all()
-
-        for task in tasks:
-            queue_manager.put(
-                queue_name=queue_name,
-                task_id=task.id,
-                priority=0,
-            )
-
-        queue_manager.mark_initialized(queue_name)
-        logger.info(f"[Queue] rebuilt: {queue_name}")
-            
-    async def rebuild_vas_task_queue(db: AsyncSession, queue_name: str):
-        if queue_manager.is_initialized(queue_name):
-            return
-        
-        queue = queue_manager.get_queue(queue_name)
-        queue.clear()
-        
-        stmt = (
-            select(VasTask)
-            .where(
-                VasTask.routing_key == queue_name,
-                VasTask.status == 'pending'
-            )
-        )
-        tasks = (await db.execute(stmt)).scalars().all()
-
-        for task in tasks:
-            queue_manager.put(
-                queue_name=task.routing_key,
-                task_id=task.id,
-                priority=task.priority,
-            )
-
-        queue_manager.mark_initialized(queue_name)
-        logger.info(f"[Queue] rebuilt: {queue_name}")
-        
-    async def pop_task(db: AsyncSession, queue_name: str):
-        """
-        从指定队列出队一个任务,并标记为 RUNNING
-        """
-        await QueueService.rebuild_task_queue(db, queue_name)
-        task_id = queue_manager.pop(queue_name)
-        if not task_id:
-            raise NotFoundError(f'{queue_name} is empty')
-
-        stmt = select(Task).where(Task.id == task_id)
-        task = (await db.execute(stmt)).scalar_one_or_none()
-
-        if task.status != 0:
-            raise BizLogicError(f'task not READY, skipped')
-        
-        task.status = 1
-
-        await db.commit()
-        await db.refresh(task)
-
-        return task
-        
-        
-    async def pop_vas_task(db: AsyncSession, queue_name: str):
-        """
-        从指定队列出队一个任务,并标记为 RUNNING
-        """
-        await QueueService.rebuild_vas_task_queue(db, queue_name)
-        task_id = queue_manager.pop(queue_name)
-        if not task_id:
-            raise NotFoundError(f'{queue_name} is empty')
-        
-        stmt = select(VasTask).where(VasTask.id == task_id)
-        task = (await db.execute(stmt)).scalar_one_or_none()
-        
-        if task.status != "pending":
-            raise BizLogicError(f'task not READY, skipped')
-        
-        task.status = "running"
-        
-        await db.commit()
-        await db.refresh(task)
-
-        return task
-    
-    
-    async def dump_all():
-        return queue_manager.dump_all()

+ 38 - 17
app/services/task_service.py

@@ -5,7 +5,6 @@ from typing import List, Optional
 from sqlalchemy.ext.asyncio import AsyncSession
 from sqlalchemy import select, update
 
-from app.core.queue_manager import queue_manager
 from app.core.biz_exception import NotFoundError
 from app.models.task import Task
 from app.schemas.task import TaskCreate, TaskUpdate
@@ -26,15 +25,45 @@ class TaskService:
         db.add(db_obj)
         await db.commit()
         await db.refresh(db_obj)
-        
-        # 👇 只在 READY 状态才入队
-        if db_obj.status == 0:
-            queue_manager.put(
-                queue_name=db_obj.command,
-                task_id=db_obj.id,
-                priority=0,   # 先用默认优先级
-            )
         return db_obj
+    
+    async def pop_task(db: AsyncSession, queue_name: str) -> Task:
+        """
+        异步 POP 方法 (MySQL 8.0+ 专用)
+        原子性获取并锁定一个任务
+        """
+        try:
+            # --- 第一步:查询并抢占锁 (SELECT ... FOR UPDATE SKIP LOCKED) ---
+            stmt = (
+                select(Task)
+                .where(Task.command == queue_name)  # 指定队列/命令类型
+                .where(Task.status == 0)            # 只找待执行的
+                .order_by(Task.id.asc()) # 优先级 > 时间
+                .limit(1)
+                # MySQL 8.0+ 必须加 skip_locked=True,否则并发高时会造成大量等待
+                .with_for_update(skip_locked=True)  
+            )
+            
+            result = await db.execute(stmt)
+            task = result.scalar_one_or_none()
+            
+            # --- 第二步:更新状态 (UPDATE) ---
+            if task:
+                task.status = 1  # 标记为执行中
+                task.update_at = datetime.now()
+                
+                # session.begin() 退出时会自动执行 commit()
+                # 此时数据库中的状态已变更
+                await db.commit()
+                return task
+            
+            # 如果没抢到任务
+            raise NotFoundError(message="Task not found")
+
+        except Exception as e:
+            # 记录日志
+            # session.begin() 会自动 rollback,无需手动调用
+            raise e
 
     # ======================
     # 根据 ID 获取
@@ -70,14 +99,6 @@ class TaskService:
 
         await db.commit()
         await db.refresh(db_obj)
-        
-        # 👇 核心:从非 READY → READY,才重新入队
-        if old_status != 0 and db_obj.status == 0:
-            queue_manager.put(
-                queue_name=db_obj.command,
-                task_id=db_obj.id,
-                priority=0,
-            )
         return db_obj
 
     # ======================

+ 60 - 18
app/services/vas_task_service.py

@@ -1,15 +1,14 @@
 # app/services/task_service.py
 
-from datetime import datetime
+from datetime import datetime, timedelta
 from typing import List, Optional
 
 from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy import select
+from sqlalchemy import select, or_, and_
 
 
 from app.utils.search import apply_keyword_search_stmt
 from app.utils.pagination import paginate
-from app.core.queue_manager import queue_manager
 from app.core.biz_exception import NotFoundError,BizLogicError
 from app.models.vas_task import VasTask
 from app.models.order import VasOrder
@@ -29,13 +28,65 @@ class VasTaskService:
         db.add(rec)
         await db.commit()
         await db.refresh(rec)
-        
-        queue_manager.put(
-            queue_name=rec.routing_key,
-            task_id=task.id,
-            priority=task.priority
-        )
         return rec
+    
+    @staticmethod
+    async def pop_vas_task(session: AsyncSession, routing_key: str, cooldown_seconds: int = 60):
+        """
+        异步获取任务,支持冷却期机制。
+        
+        :param session: 数据库异步会话
+        :param routing_key: 队列键值
+        :param cooldown_seconds: 失败后的冷却时间(秒),默认60秒
+        :return: VasTask 对象 or None
+        """
+        # 计算冷却截止时间:当前时间 - 冷却秒数
+        # 只有 updated_at 早于这个时间的重试任务,才会被提取
+        cutoff_time = datetime.utcnow() - timedelta(seconds=cooldown_seconds)
+
+        try:
+            # --- 构造查询语句 ---
+            stmt = (
+                select(VasTask)
+                .where(VasTask.routing_key == routing_key)
+                .where(VasTask.status == 'pending')
+                # === 核心逻辑:冷却期筛选 ===
+                .where(
+                    or_(
+                        # 情况1:这是一个全新任务 (从未尝试过)
+                        VasTask.attempt_count == 0,
+                        
+                        # 情况2:这是一个重试任务,且距离上次更新(失败)已经过了冷却期
+                        and_(
+                            VasTask.attempt_count > 0,
+                            VasTask.updated_at < cutoff_time
+                        )
+                    )
+                )
+                # 排序:优先级优先(0假设是最高优先级?),其次是先创建的
+                # 注意:根据你的业务,priority 可能 desc 才是高优先级,这里按 asc 写
+                .order_by(VasTask.priority.desc(), VasTask.id.asc())
+                .limit(1)
+                # MySQL 8.0+ 必加,跳过被锁定的行
+                .with_for_update(skip_locked=True)
+            )
+
+            result = await session.execute(stmt)
+            task = result.scalar_one_or_none()
+
+            # --- 更新状态 ---
+            if task:
+                task.status = 'running'     # 标记为已被抓取
+                task.attempt_count += 1     # 增加尝试次数
+                task.updated_at = datetime.utcnow() # 更新时间(重置冷却计时起点)
+                
+                await session.commit()
+                # session.begin() 结束时自动 commit
+                return task
+            raise NotFoundError(message="Task not found")
+        except Exception as e:
+            # 记录日志
+            raise e
 
     @staticmethod
     async def list_task(
@@ -119,17 +170,8 @@ class VasTaskService:
             raise BizLogicError("Task is in queue already")
         
         rec.status = "pending"
-        if rec.status == "grabbed":
-            rec.attempt_count = (rec.attempt_count or 0) + 1
-
         await db.commit()
         await db.refresh(rec)
-        
-        queue_manager.put(
-            queue_name=rec.routing_key,
-            task_id=rec.id,
-            priority= max(0, rec.priority - rec.attempt_count),
-        )
         return rec
 
     @staticmethod

+ 0 - 6
app/services/webhook_service.py

@@ -7,7 +7,6 @@ from decimal import Decimal
 from sqlalchemy.ext.asyncio import AsyncSession
 from sqlalchemy import select
 
-from app.core.queue_manager import queue_manager
 from app.core.biz_exception import NotFoundError, BizLogicError
 from app.models.order import VasOrder
 from app.models.vas_task import VasTask
@@ -68,11 +67,6 @@ class WebhookService:
             db.add(task)
             await db.flush()
             await db.refresh(task)
-            queue_manager.put(
-                queue_name=routing.routing_key,
-                task_id=task.id,
-                priority=task.priority
-            )
             created_tasks.append(task)
         return created_tasks
 

+ 2 - 2
app/services/docker_remote_control.py → app/utils/docker_remote_control.py

@@ -13,7 +13,7 @@ import os
 import sys
 import argparse
 from pathlib import Path
-from typing import Optional, Dict, Any, List
+from typing import Optional, Dict, Any, List,Tuple
 import paramiko
 from io import StringIO
 
@@ -82,7 +82,7 @@ class DockerRemoteController:
         if self.ssh_client:
             self.ssh_client.close()
     
-    def execute_command(self, command: str) -> tuple[str, str, int]:
+    def execute_command(self, command: str) -> Tuple[str, str, int]:
         """
         执行远程命令