scheduler_handlers.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. from app.services.scheduler_service import register_scheduled_task, scheduler_service
  2. from app.core.logger import logger
  3. @register_scheduled_task(name="visametric_slot_expire_checker", interval_hours=1)
  4. async def visametric_slot_expire_checker():
  5. """
  6. 检查 vas_task 中状态为 grabbed 的任务,
  7. 如果 grabbed_history 中的 slot_date 距离现在不足4天,发送企业微信通知
  8. """
  9. from app.core.database import AsyncSessionLocal
  10. from sqlalchemy import select
  11. from datetime import datetime, timedelta
  12. from app.models.vas_task import VasTask
  13. from app.services.wechat_service import WechatService
  14. try:
  15. expire_days = 4
  16. now = datetime.utcnow()
  17. threshold_date = (now + timedelta(days=expire_days)).strftime("%Y-%m-%d")
  18. async with AsyncSessionLocal() as db:
  19. stmt = select(VasTask).where(
  20. VasTask.routing_key == "auto.slot.dub.de.tourist",
  21. VasTask.status == "grabbed"
  22. )
  23. result = await db.execute(stmt)
  24. tasks = result.scalars().all()
  25. expiring_tasks = []
  26. for task in tasks:
  27. grabbed_history = task.grabbed_history or {}
  28. slot_date_str = grabbed_history.get("slot_date", "")
  29. if not slot_date_str:
  30. continue
  31. try:
  32. try:
  33. slot_date = datetime.strptime(slot_date_str, "%Y-%m-%d")
  34. except ValueError:
  35. slot_date = datetime.strptime(slot_date_str, "%d/%m/%Y")
  36. if slot_date <= now + timedelta(days=expire_days):
  37. expiring_tasks.append({
  38. "order_id": task.order_id,
  39. "slot_date": slot_date_str,
  40. "slot_time": grabbed_history.get("slot_time", "N/A"),
  41. })
  42. except ValueError:
  43. logger.warning(f"[Scheduler] Invalid slot_date format: {slot_date_str}")
  44. if expiring_tasks:
  45. lines = [f"⚠️ 以下德签 slot 即将到期(不足 {expire_days} 天):"]
  46. for t in expiring_tasks:
  47. lines.append(f"- 订单号: {t['order_id']}, Slot: {t['slot_date']} {t['slot_time']}")
  48. content = "\n".join(lines)
  49. await WechatService.push_markdown_no_token(content)
  50. logger.info(f"[Scheduler] Sent WeChat notification for {len(expiring_tasks)} expiring tasks")
  51. else:
  52. logger.info("[Scheduler] No expiring tasks found")
  53. except Exception as e:
  54. logger.error(f"[Scheduler] Error in visametric_slot_expire_checker: {e}")