| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import asyncio
- from datetime import datetime, timedelta
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.core.database import AsyncSessionLocal
- from app.models.emails import VasEmail
- from app.services.order_event_service import OrderEventService
- from app.schemas.order_event import VasOrderEventCreate
- from app.core.biz_exception import BizLogicError
- async def sync_recent_emails():
- # 1. 计算 3 周前的时间
- three_weeks_ago = datetime.utcnow() - timedelta(weeks=4)
-
- batch_size = 100
- offset = 0
- total_processed = 0
- total_synced = 0
- total_skipped = 0
- print(f"[*] 开始扫描邮件表,起始时间: {three_weeks_ago}")
- async with AsyncSessionLocal() as db:
- while True:
- # 2. 分批查询 emails 表 (按 created_at 过滤并排序)
- stmt = (
- select(VasEmail)
- .where(VasEmail.created_at >= three_weeks_ago)
- .order_by(VasEmail.created_at.asc())
- .offset(offset)
- .limit(batch_size)
- )
-
- result = await db.execute(stmt)
- batch_emails = result.scalars().all()
-
- if not batch_emails:
- break # 没有更多数据了
- for email in batch_emails:
- total_processed += 1
-
- # 3. 构造创建参数
- # recipient 映射为 alias_email
- event_in = VasOrderEventCreate(
- event_title=email.subject or "No Subject",
- event_message=email.body_text[:2000] if email.body_text else "", # 适当截断防溢出
- email_uid=email.uid,
- event_time=email.created_at, # receive_time 是字符串,使用 created_at 更稳妥
- alias_email=email.recipient,
- order_no=None # 让 Service 根据 alias_email 自动解析
- )
- try:
- # 4. 调用之前写好的 Service (它会自动查询 order_id 并写入)
- await OrderEventService.create(db, event_in)
- total_synced += 1
- if total_synced % 10 == 0:
- print(f"[-] 已处理: {total_processed}, 成功写入: {total_synced}")
- except BizLogicError as e:
- # 如果 alias_email 找不到对应的 order_id,会抛出这个错,我们记录并跳过
- total_skipped += 1
- # print(f"[!] 跳过 UID {email.uid}: {e.message}")
- except Exception as e:
- print(f"[ERR] 处理 UID {email.uid} 时发生未知错误: {e}")
-
- # 移动偏移量进行下一批次
- offset += batch_size
- print(f"\n[*] 任务完成!")
- print(f"[*] 总扫描数: {total_processed}")
- print(f"[*] 成功写入 OrderEvent: {total_synced}")
- print(f"[*] 被忽略(无对应订单): {total_skipped}")
- if __name__ == "__main__":
- asyncio.run(sync_recent_emails())
|