| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- import re
- import json
- import time
- import random
- import asyncio
- import aiohttp
- from datetime import datetime, timedelta
- from typing import List, Optional, Tuple, Dict, Any
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select, or_, and_
- from redis.asyncio import Redis # 引入 Redis 类型
- from redis.asyncio import Redis
- from starlette.concurrency import run_in_threadpool
- from app.core.biz_exception import NotFoundError, BizLogicError
- from app.models.vas_task import VasTask
- from app.core.logger import logger
- from app.services.email_authorizations_service import EmailAuthorizationService
- class VisametricService:
-
- @staticmethod
- async def update_pnr(db: AsyncSession, task_id: int) -> VasTask:
- stmt = select(VasTask).where(VasTask.id == task_id)
- result = await db.execute(stmt)
- task = result.scalar_one_or_none()
- if not task:
- raise NotFoundError("Task not exist")
-
- grabbed_history = task.grabbed_history or {}
- inputs: Dict[str, Any] = task.user_inputs or {}
- task_config = task.config or {}
- email_account = "hujiarui8@gmail.com"
-
- first_name = inputs.get('first_name', '')
- last_name = inputs.get('last_name', '')
- pnr_number = grabbed_history.get('pnr_number', '')
- slot_date = grabbed_history.get('slot_date', '') # YYYY-MM-DD
- slot_time = grabbed_history.get('slot_time', '')
-
- sender = "noreply at visametric.com"
- recipient = task_config.get("alias_email", "")
- subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request")
- body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}")
- auth = await EmailAuthorizationService.get_by_email(db, email_account)
- if not auth:
- raise ValueError(f"Email auth not found: {email_account}")
- email_context = await EmailAuthorizationService.fetch_email_authorizations2(
- db=db, auth=auth, sender=sender, recipient=recipient,
- subject_keywords=subject_keywords, body_keywords=body_keywords
- )
-
- pattern = r"SDP[A-Z0-9]+"
- match = re.search(pattern, email_context)
- if not match:
- raise NotFoundError(message="PNR not found")
- current_history = dict(task.grabbed_history) if task.grabbed_history else {}
- current_history['pnr_number'] = str(match.group())
- task.grabbed_history = current_history
- await db.commit()
- await db.refresh(task)
- return task
-
- @staticmethod
- async def cancel_appointment(db: AsyncSession, redis_client: Redis, task_id: int):
- stmt = select(VasTask).where(VasTask.id == task_id)
- result = await db.execute(stmt)
- task = result.scalar_one_or_none()
- if not task:
- raise NotFoundError("Task not exist")
-
- grabbed_history = task.grabbed_history or {}
- inputs: Dict[str, Any] = task.user_inputs or {}
-
- meta = dict(task.meta) if task.meta else {}
-
- first_name = inputs.get('first_name', '')
- last_name = inputs.get('last_name', '')
- birthday = inputs.get('birthday', '')
- passport_no = inputs.get('passport_no', '')
- pnr_number = grabbed_history.get('pnr_number', '')
- slot_date = grabbed_history.get('slot_date', '')
- slot_time = grabbed_history.get('slot_time', '')
-
- ss = birthday.split('-')
- cancel_request = {
- "passport": passport_no,
- "birth_day": ss[2],
- "birth_month": ss[1],
- "birth_year": ss[0],
- "pnrcontrolform": pnr_number
- }
- print(f'cancel request={cancel_request}')
-
- queue_key = "visametric_cancel_person_info"
- message = json.dumps(cancel_request)
- await redis_client.lpush(queue_key, message)
-
- time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- meta["cancelled_at"] = time_str
- task.meta = meta
- await db.commit()
|