import asyncio from datetime import datetime, timedelta from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import AsyncSessionLocal from app.models.emails import VasEmail from app.services.order_event_service import OrderEventService from app.schemas.order_event import VasOrderEventCreate from app.core.biz_exception import BizLogicError async def sync_recent_emails(): # 1. 计算 3 周前的时间 three_weeks_ago = datetime.utcnow() - timedelta(weeks=4) batch_size = 100 offset = 0 total_processed = 0 total_synced = 0 total_skipped = 0 print(f"[*] 开始扫描邮件表,起始时间: {three_weeks_ago}") async with AsyncSessionLocal() as db: while True: # 2. 分批查询 emails 表 (按 created_at 过滤并排序) stmt = ( select(VasEmail) .where(VasEmail.created_at >= three_weeks_ago) .order_by(VasEmail.created_at.asc()) .offset(offset) .limit(batch_size) ) result = await db.execute(stmt) batch_emails = result.scalars().all() if not batch_emails: break # 没有更多数据了 for email in batch_emails: total_processed += 1 # 3. 构造创建参数 # recipient 映射为 alias_email event_in = VasOrderEventCreate( event_title=email.subject or "No Subject", event_message=email.body_text[:2000] if email.body_text else "", # 适当截断防溢出 email_uid=email.uid, event_time=email.created_at, # receive_time 是字符串,使用 created_at 更稳妥 alias_email=email.recipient, order_no=None # 让 Service 根据 alias_email 自动解析 ) try: # 4. 调用之前写好的 Service (它会自动查询 order_id 并写入) await OrderEventService.create(db, event_in) total_synced += 1 if total_synced % 10 == 0: print(f"[-] 已处理: {total_processed}, 成功写入: {total_synced}") except BizLogicError as e: # 如果 alias_email 找不到对应的 order_id,会抛出这个错,我们记录并跳过 total_skipped += 1 # print(f"[!] 跳过 UID {email.uid}: {e.message}") except Exception as e: print(f"[ERR] 处理 UID {email.uid} 时发生未知错误: {e}") # 移动偏移量进行下一批次 offset += batch_size print(f"\n[*] 任务完成!") print(f"[*] 总扫描数: {total_processed}") print(f"[*] 成功写入 OrderEvent: {total_synced}") print(f"[*] 被忽略(无对应订单): {total_skipped}") if __name__ == "__main__": asyncio.run(sync_recent_emails())