sync_emails_to_events.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import asyncio
  2. from datetime import datetime, timedelta
  3. from sqlalchemy import select
  4. from sqlalchemy.ext.asyncio import AsyncSession
  5. from app.core.database import AsyncSessionLocal
  6. from app.models.emails import VasEmail
  7. from app.services.order_event_service import OrderEventService
  8. from app.schemas.order_event import VasOrderEventCreate
  9. from app.core.biz_exception import BizLogicError
  10. async def sync_recent_emails():
  11. # 1. 计算 3 周前的时间
  12. three_weeks_ago = datetime.utcnow() - timedelta(weeks=4)
  13. batch_size = 100
  14. offset = 0
  15. total_processed = 0
  16. total_synced = 0
  17. total_skipped = 0
  18. print(f"[*] 开始扫描邮件表,起始时间: {three_weeks_ago}")
  19. async with AsyncSessionLocal() as db:
  20. while True:
  21. # 2. 分批查询 emails 表 (按 created_at 过滤并排序)
  22. stmt = (
  23. select(VasEmail)
  24. .where(VasEmail.created_at >= three_weeks_ago)
  25. .order_by(VasEmail.created_at.asc())
  26. .offset(offset)
  27. .limit(batch_size)
  28. )
  29. result = await db.execute(stmt)
  30. batch_emails = result.scalars().all()
  31. if not batch_emails:
  32. break # 没有更多数据了
  33. for email in batch_emails:
  34. total_processed += 1
  35. # 3. 构造创建参数
  36. # recipient 映射为 alias_email
  37. event_in = VasOrderEventCreate(
  38. event_title=email.subject or "No Subject",
  39. event_message=email.body_text[:2000] if email.body_text else "", # 适当截断防溢出
  40. email_uid=email.uid,
  41. event_time=email.created_at, # receive_time 是字符串,使用 created_at 更稳妥
  42. alias_email=email.recipient,
  43. order_no=None # 让 Service 根据 alias_email 自动解析
  44. )
  45. try:
  46. # 4. 调用之前写好的 Service (它会自动查询 order_id 并写入)
  47. await OrderEventService.create(db, event_in)
  48. total_synced += 1
  49. if total_synced % 10 == 0:
  50. print(f"[-] 已处理: {total_processed}, 成功写入: {total_synced}")
  51. except BizLogicError as e:
  52. # 如果 alias_email 找不到对应的 order_id,会抛出这个错,我们记录并跳过
  53. total_skipped += 1
  54. # print(f"[!] 跳过 UID {email.uid}: {e.message}")
  55. except Exception as e:
  56. print(f"[ERR] 处理 UID {email.uid} 时发生未知错误: {e}")
  57. # 移动偏移量进行下一批次
  58. offset += batch_size
  59. print(f"\n[*] 任务完成!")
  60. print(f"[*] 总扫描数: {total_processed}")
  61. print(f"[*] 成功写入 OrderEvent: {total_synced}")
  62. print(f"[*] 被忽略(无对应订单): {total_skipped}")
  63. if __name__ == "__main__":
  64. asyncio.run(sync_recent_emails())