from app.services.scheduler_service import register_scheduled_task, scheduler_service from app.core.logger import logger @register_scheduled_task(name="visametric_slot_expire_checker", interval_hours=1) async def visametric_slot_expire_checker(): """ 检查 vas_task 中状态为 grabbed 的任务, 如果 grabbed_history 中的 slot_date 距离现在不足4天,发送企业微信通知 """ from app.core.database import AsyncSessionLocal from sqlalchemy import select from datetime import datetime, timedelta from app.models.vas_task import VasTask from app.services.wechat_service import WechatService try: expire_days = 4 now = datetime.utcnow() threshold_date = (now + timedelta(days=expire_days)).strftime("%Y-%m-%d") async with AsyncSessionLocal() as db: stmt = select(VasTask).where( VasTask.routing_key == "auto.slot.dub.de.tourist", VasTask.status == "grabbed" ) result = await db.execute(stmt) tasks = result.scalars().all() expiring_tasks = [] for task in tasks: grabbed_history = task.grabbed_history or {} slot_date_str = grabbed_history.get("slot_date", "") if not slot_date_str: continue try: try: slot_date = datetime.strptime(slot_date_str, "%Y-%m-%d") except ValueError: slot_date = datetime.strptime(slot_date_str, "%d/%m/%Y") if slot_date <= now + timedelta(days=expire_days): expiring_tasks.append({ "order_id": task.order_id, "slot_date": slot_date_str, "slot_time": grabbed_history.get("slot_time", "N/A"), }) except ValueError: logger.warning(f"[Scheduler] Invalid slot_date format: {slot_date_str}") if expiring_tasks: lines = [f"⚠️ 以下德签 slot 即将到期(不足 {expire_days} 天):"] for t in expiring_tasks: lines.append(f"- 订单号: {t['order_id']}, Slot: {t['slot_date']} {t['slot_time']}") content = "\n".join(lines) await WechatService.push_markdown_no_token(content) logger.info(f"[Scheduler] Sent WeChat notification for {len(expiring_tasks)} expiring tasks") else: logger.info("[Scheduler] No expiring tasks found") except Exception as e: logger.error(f"[Scheduler] Error in visametric_slot_expire_checker: {e}")