Ver Fonte

feat: update

jerry há 2 meses atrás
pai
commit
a7738a5004

+ 20 - 2
app/api/router.py

@@ -57,6 +57,7 @@ from app.schemas.docker_remote import RemoteServerConfig, DockerStatusOut, Docke
 from app.services.docker_remote_service import DockerRemoteService
 from app.services.configuration_service import ConfigurationService
 from app.services.troov_service import TroovService
+from app.services.visametric_service import VisametricService
 from app.services.sms_service import save_short_message, query_short_message
 from app.services.email_authorizations_service import EmailAuthorizationService
 from app.services.short_url_service import ShortUrlService
@@ -308,7 +309,7 @@ async def troov_get_all_probs(
     return success(data=obj)
 
 @admin_required_router.post("/troov/set-prob", summary="TROOV 修改概率", tags=["通用接口"], response_model=ApiResponse[List[TroovProb]])
-async def troov_add_prob(
+async def troov_set_prob(
     payload: TroovProb,
     redis_client: Redis = Depends(get_redis_client)
 ):
@@ -324,13 +325,30 @@ async def troov_del_prob(
     return success(data=obj)
 
 @admin_required_router.post("/troov/reset-probs", summary="TROOV 重置概率", tags=["通用接口"], response_model=ApiResponse[List[TroovProb]])
-async def troov_del_prob(
+async def troov_reset_prob(
     date: str,
     redis_client: Redis = Depends(get_redis_client)
 ):
     obj = await TroovService.reset_probs(redis_client, date)
     return success(data=obj)
 
+@admin_required_router.post("/visametric/update_pnr", summary="VISAMETRIC 读取PNR", tags=["Visametric专用"], response_model=ApiResponse[VasTaskOut])
+async def visametric_update_pnr(
+    task_id: int,
+    db: AsyncSession = Depends(get_db)
+):
+    obj = await VisametricService.update_pnr(db, task_id)
+    return success(data=obj)
+
+@admin_required_router.post("/visametric/cancel_appointment", summary="VISAMETRIC 取消预约", tags=["Visametric专用"], response_model=ApiResponse)
+async def visametric_cancel_appointment(
+    task_id: int,
+    db: AsyncSession = Depends(get_db),
+    redis_client: Redis = Depends(get_redis_client)
+):
+    await VisametricService.cancel_appointment(db, redis_client, task_id)
+    return success()
+
 @admin_required_router.post("/dynamic-configurations", summary="创建动态配置", tags=["动态配置"], response_model=ApiResponse[ConfigurationOut])
 async def dynamic_config_create(config_in: ConfigurationCreate, db: AsyncSession = Depends(get_db)):
     obj = await ConfigurationService.create(db, config_in)

+ 63 - 0
app/services/email_authorizations_service.py

@@ -172,6 +172,69 @@ class EmailAuthorizationService:
         else:
             smtp = smtplib.SMTP_SSL(host, port)
         return smtp
+    
+    @staticmethod
+    async def fetch_email_authorizations2(
+            db: Session,
+            auth,
+            sender: str,
+            recipient: str,
+            subject_keywords: str,
+            body_keywords: str
+    ):
+        # =========================================================
+        # 第一步:在数据库中查找最新的 UID (主线程/DB线程执行)
+        # =========================================================
+        
+        # 1. 构建动态 SQL
+        # 假设表名为 emails,字段为 uid, sender, recipient, subject, body_text
+        sql = "SELECT uid, subject, body_text FROM emails WHERE 1=1"
+        params = {}
+
+        # 2. 处理发件人 (模糊匹配)
+        if sender.strip():
+            sql += " AND sender LIKE :sender"
+            params['sender'] = f"%{sender.strip()}%"
+
+        # 3. 处理收件人 (模糊匹配)
+        if recipient.strip():
+            sql += " AND recipient LIKE :recipient"
+            params['recipient'] = f"%{recipient.strip()}%"
+
+        # 4. 处理主题关键词 (OR 关系)
+        subj_keys = [k.strip() for k in subject_keywords.split(',') if k.strip()]
+        if subj_keys:
+            for i, k in enumerate(subj_keys):
+                key_name = f"subj_{i}"
+                # 直接拼接到主 SQL 中,要求同时满足
+                sql += f" AND subject LIKE :{key_name}"
+                params[key_name] = f"%{k}%"
+
+        # 5. 处理内容关键词 (OR 关系)
+        body_keys = [k.strip() for k in body_keywords.split(',') if k.strip()]
+        if body_keys:
+            for i, k in enumerate(body_keys):
+                key_name = f"body_{i}"
+                # 直接拼接到主 SQL 中,要求同时满足
+                sql += f" AND body_text LIKE :{key_name}"
+                params[key_name] = f"%{k}%"
+
+        # 6. 获取最新的一条
+        sql += " ORDER BY uid DESC LIMIT 1"
+ 
+        # 执行查询
+        result_proxy = await db.execute(text(sql), params)
+        result = result_proxy.fetchone()
+        
+        if not result:
+            logger.info(f"DB Search: No email found for {sender} -> {recipient}")
+            return None
+            
+        target_uid = result.uid
+        target_subject = result.subject
+        target_body_text = result.body_text
+        logger.info(f"DB Search: Found UID {target_uid} matching criteria. Subject: {target_subject}")
+        return f'{target_subject}\n{target_body_text}'
 
     @staticmethod
     async def fetch_email_authorizations(

+ 108 - 0
app/services/visametric_service.py

@@ -0,0 +1,108 @@
+import re
+import json
+import time
+import random
+import asyncio
+import aiohttp
+from datetime import datetime, timedelta
+from typing import List, Optional, Tuple, Dict, Any
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select, or_, and_
+from redis.asyncio import Redis  # 引入 Redis 类型
+
+from redis.asyncio import Redis
+from starlette.concurrency import run_in_threadpool
+from app.core.biz_exception import NotFoundError, BizLogicError
+from app.models.vas_task import VasTask
+from app.core.logger import logger
+from app.services.email_authorizations_service import EmailAuthorizationService
+
+class VisametricService:
+  
+    @staticmethod
+    async def update_pnr(db: AsyncSession, task_id: int) -> VasTask:
+        stmt = select(VasTask).where(VasTask.id == task_id)
+        result = await db.execute(stmt)
+        task = result.scalar_one_or_none()
+
+        if not task:
+            raise NotFoundError("Task not exist")
+        
+        grabbed_history = task.grabbed_history or {}
+        inputs: Dict[str, Any] = task.user_inputs or {}
+        task_config = task.config or {}
+
+        email_account = "visafly666@gmail.com"
+        
+        first_name = inputs.get('first_name', '')
+        last_name = inputs.get('last_name', '')
+        pnr_number = grabbed_history.get('pnr_number', '')
+        slot_date = grabbed_history.get('slot_date', '') # YYYY-MM-DD
+        slot_time = grabbed_history.get('slot_time', '')
+        
+        sender = "noreply at visametric.com"
+        recipient = task_config.get("alias_email", "")
+        subject_keywords = inputs.get("subjectKeywords", f"{pnr_number},Visametric,Appointment,Request")
+        body_keywords = inputs.get("bodyKeywords", f"{pnr_number},{slot_date},{slot_time},{first_name},{last_name}")
+
+        auth = await EmailAuthorizationService.get_by_email(db, email_account)
+        if not auth:
+            raise ValueError(f"Email auth not found: {email_account}")
+
+        email_context = await EmailAuthorizationService.fetch_email_authorizations2(
+            db=db, auth=auth, sender=sender, recipient=recipient,
+            subject_keywords=subject_keywords, body_keywords=body_keywords
+        )
+        
+        pattern = r"SDP[A-Z0-9]+"
+        match = re.search(pattern, email_context)
+        if not match:
+            raise NotFoundError(message="PNR not found")
+        current_history = dict(task.grabbed_history) if task.grabbed_history else {} 
+        current_history['pnr_number'] = str(match.group())
+        task.grabbed_history = current_history
+        await db.commit()
+        await db.refresh(task)
+        return task
+                
+    @staticmethod
+    async def cancel_appointment(db: AsyncSession, redis_client: Redis, task_id: int):
+        stmt = select(VasTask).where(VasTask.id == task_id)
+        result = await db.execute(stmt)
+        task = result.scalar_one_or_none()
+
+        if not task:
+            raise NotFoundError("Task not exist")
+        
+        grabbed_history = task.grabbed_history or {}
+        inputs: Dict[str, Any] = task.user_inputs or {}
+        
+        meta = dict(task.meta) if task.meta else {}
+        
+        first_name = inputs.get('first_name', '')
+        last_name = inputs.get('last_name', '')
+        birthday = inputs.get('birthday', '')
+        passport_no = inputs.get('passport_no', '')
+        pnr_number = grabbed_history.get('pnr_number', '')
+        slot_date = grabbed_history.get('slot_date', '')
+        slot_time = grabbed_history.get('slot_time', '')
+        
+        ss = birthday.split('-')
+        cancel_request = {
+            "passport": passport_no,
+            "birth_day": ss[2],
+            "birth_month": ss[1],
+            "birth_year": ss[0],
+            "pnrcontrolform": pnr_number
+        }
+        print(f'cancel request={cancel_request}')
+        
+        queue_key = "visametric_cancel_person_info"
+        message = json.dumps(cancel_request)
+        await redis_client.lpush(queue_key, message)
+        
+        time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        meta["cancelled_at"] = time_str
+
+        task.meta = meta
+        await db.commit()