import re import json import time import random import asyncio import aiohttp from datetime import datetime, timedelta from typing import List, Optional, Tuple, Dict, Any from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, or_, and_ from redis.asyncio import Redis # 引入 Redis 类型 from redis.asyncio import Redis from starlette.concurrency import run_in_threadpool from app.core.biz_exception import NotFoundError, BizLogicError from app.models.vas_task import VasTask from app.core.logger import logger from app.services.email_authorizations_service import EmailAuthorizationService class VisametricService: @staticmethod async def update_pnr(db: AsyncSession, task_id: int) -> VasTask: stmt = select(VasTask).where(VasTask.id == task_id) result = await db.execute(stmt) task = result.scalar_one_or_none() if not task: raise NotFoundError("Task not exist") grabbed_history = task.grabbed_history or {} inputs: Dict[str, Any] = task.user_inputs or {} task_config = task.config or {} email_account = "hujiarui8@gmail.com" first_name = inputs.get('first_name', '') last_name = inputs.get('last_name', '') pnr_number = grabbed_history.get('pnr_number', '') slot_date = grabbed_history.get('slot_date', '') # YYYY-MM-DD slot_time = grabbed_history.get('slot_time', '') sender = "noreply at visametric.com" recipient = task_config.get("alias_email", "") subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request") body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}") auth = await EmailAuthorizationService.get_by_email(db, email_account) if not auth: raise ValueError(f"Email auth not found: {email_account}") email_context = await EmailAuthorizationService.fetch_email_authorizations2( db=db, auth=auth, sender=sender, recipient=recipient, subject_keywords=subject_keywords, body_keywords=body_keywords ) pattern = r"SDP[A-Z0-9]+" match = re.search(pattern, email_context) if not match: raise NotFoundError(message="PNR not found") current_history = dict(task.grabbed_history) if task.grabbed_history else {} current_history['pnr_number'] = str(match.group()) task.grabbed_history = current_history await db.commit() await db.refresh(task) return task @staticmethod async def cancel_appointment(db: AsyncSession, redis_client: Redis, task_id: int): stmt = select(VasTask).where(VasTask.id == task_id) result = await db.execute(stmt) task = result.scalar_one_or_none() if not task: raise NotFoundError("Task not exist") grabbed_history = task.grabbed_history or {} inputs: Dict[str, Any] = task.user_inputs or {} meta = dict(task.meta) if task.meta else {} first_name = inputs.get('first_name', '') last_name = inputs.get('last_name', '') birthday = inputs.get('birthday', '') passport_no = inputs.get('passport_no', '') pnr_number = grabbed_history.get('pnr_number', '') slot_date = grabbed_history.get('slot_date', '') slot_time = grabbed_history.get('slot_time', '') ss = birthday.split('-') cancel_request = { "passport": passport_no, "birth_day": ss[2], "birth_month": ss[1], "birth_year": ss[0], "pnrcontrolform": pnr_number } print(f'cancel request={cancel_request}') queue_key = "visametric_cancel_person_info" message = json.dumps(cancel_request) await redis_client.lpush(queue_key, message) time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") meta["cancelled_at"] = time_str task.meta = meta await db.commit()