visametric_service.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import re
  2. import json
  3. import time
  4. import random
  5. import asyncio
  6. import aiohttp
  7. from datetime import datetime, timedelta
  8. from typing import List, Optional, Tuple, Dict, Any
  9. from sqlalchemy.ext.asyncio import AsyncSession
  10. from sqlalchemy import select, or_, and_
  11. from redis.asyncio import Redis # 引入 Redis 类型
  12. from redis.asyncio import Redis
  13. from starlette.concurrency import run_in_threadpool
  14. from app.core.biz_exception import NotFoundError, BizLogicError
  15. from app.models.vas_task import VasTask
  16. from app.core.logger import logger
  17. from app.services.email_authorizations_service import EmailAuthorizationService
  18. class VisametricService:
  19. @staticmethod
  20. async def update_pnr(db: AsyncSession, task_id: int) -> VasTask:
  21. stmt = select(VasTask).where(VasTask.id == task_id)
  22. result = await db.execute(stmt)
  23. task = result.scalar_one_or_none()
  24. if not task:
  25. raise NotFoundError("Task not exist")
  26. grabbed_history = task.grabbed_history or {}
  27. inputs: Dict[str, Any] = task.user_inputs or {}
  28. task_config = task.config or {}
  29. email_account = "hujiarui8@gmail.com"
  30. first_name = inputs.get('first_name', '')
  31. last_name = inputs.get('last_name', '')
  32. pnr_number = grabbed_history.get('pnr_number', '')
  33. slot_date = grabbed_history.get('slot_date', '') # YYYY-MM-DD
  34. slot_time = grabbed_history.get('slot_time', '')
  35. sender = "noreply at visametric.com"
  36. recipient = task_config.get("alias_email", "")
  37. subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request")
  38. body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}")
  39. auth = await EmailAuthorizationService.get_by_email(db, email_account)
  40. if not auth:
  41. raise ValueError(f"Email auth not found: {email_account}")
  42. email_context = await EmailAuthorizationService.fetch_email_authorizations2(
  43. db=db, auth=auth, sender=sender, recipient=recipient,
  44. subject_keywords=subject_keywords, body_keywords=body_keywords
  45. )
  46. pattern = r"SDP[A-Z0-9]+"
  47. match = re.search(pattern, email_context)
  48. if not match:
  49. raise NotFoundError(message="PNR not found")
  50. current_history = dict(task.grabbed_history) if task.grabbed_history else {}
  51. current_history['pnr_number'] = str(match.group())
  52. task.grabbed_history = current_history
  53. await db.commit()
  54. await db.refresh(task)
  55. return task
  56. @staticmethod
  57. async def cancel_appointment(db: AsyncSession, redis_client: Redis, task_id: int):
  58. stmt = select(VasTask).where(VasTask.id == task_id)
  59. result = await db.execute(stmt)
  60. task = result.scalar_one_or_none()
  61. if not task:
  62. raise NotFoundError("Task not exist")
  63. grabbed_history = task.grabbed_history or {}
  64. inputs: Dict[str, Any] = task.user_inputs or {}
  65. meta = dict(task.meta) if task.meta else {}
  66. first_name = inputs.get('first_name', '')
  67. last_name = inputs.get('last_name', '')
  68. birthday = inputs.get('birthday', '')
  69. passport_no = inputs.get('passport_no', '')
  70. pnr_number = grabbed_history.get('pnr_number', '')
  71. slot_date = grabbed_history.get('slot_date', '')
  72. slot_time = grabbed_history.get('slot_time', '')
  73. ss = birthday.split('-')
  74. cancel_request = {
  75. "passport": passport_no,
  76. "birth_day": ss[2],
  77. "birth_month": ss[1],
  78. "birth_year": ss[0],
  79. "pnrcontrolform": pnr_number
  80. }
  81. print(f'cancel request={cancel_request}')
  82. queue_key = "visametric_cancel_person_info"
  83. message = json.dumps(cancel_request)
  84. await redis_client.lpush(queue_key, message)
  85. time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  86. meta["cancelled_at"] = time_str
  87. task.meta = meta
  88. await db.commit()