| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- import asyncio
- import aiohttp
- import logging
- import json
- import traceback
- from datetime import datetime, timedelta
- from typing import Dict, List, Any, Optional
- from notification_templates import *
- # ==========================================
- # 1. 配置信息
- # ==========================================
- API_BASE_URL = "https://visafly.top"
- BEARER_TOKEN = "tok_e946329a60ff45ba807f3f41b0e8b7fc"
- MAX_ATTEMPTS = 5
- DEFAULT_TG_TOKEN = "6771183256:AAEd0Tenq4z6hk5toUGrCpEVPfP00bpYT1s"
- DEFAULT_WECHAT_TOKEN = "a8f79817-e18b-4739-8459-adb2ed5e2e32"
- DEFAULT_WA_API_KEY = "51fc877539064f5882fae0f6f0661123"
- DEFAULT_WA_SESSION = "default"
- logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
- logger = logging.getLogger("Iris")
- # ==========================================
- # 2. Iris 通知调度引擎
- # ==========================================
- class IrisWorker:
- def __init__(self):
- self.headers = {
- "Authorization": f"Bearer {BEARER_TOKEN}",
- "Accept": "application/json"
- }
- self.session: Optional[aiohttp.ClientSession] = None
- # 核心:频道锁字典,确保同类频道不并发
- self.channel_locks: Dict[str, asyncio.Lock] = {
- "email": asyncio.Lock(),
- "wechat": asyncio.Lock(),
- "whatsapp": asyncio.Lock(),
- "telegram": asyncio.Lock(),
- "tg": asyncio.Lock()
- }
- async def __aenter__(self):
- timeout = aiohttp.ClientTimeout(total=45) # 邮件发送可能较慢,稍微延长超时
- self.session = aiohttp.ClientSession(headers=self.headers, timeout=timeout)
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- if self.session:
- await self.session.close()
- def _get_next_retry_time(self, attempts: int) -> str:
- wait_seconds = 2 ** attempts
- next_time = datetime.utcnow() + timedelta(seconds=wait_seconds)
- return next_time.isoformat(timespec='milliseconds') + 'Z'
- async def _log_response_error(self, resp: aiohttp.ClientResponse, context: str):
- try:
- body = await resp.text()
- logger.error(f"❌ {context} | Status: {resp.status} | Response: {body}")
- except Exception as e:
- logger.error(f"❌ {context} | Status: {resp.status} | (Error reading body: {e})")
- async def fetch_pending_tasks(self) -> List[Dict]:
- url = f"{API_BASE_URL}/api/notification/outbox/list"
- params = {"status": "pending", "size": 50}
- try:
- async with self.session.get(url, params=params) as resp:
- if resp.status == 200:
- result = await resp.json()
- return result["data"]["items"]
- await self._log_response_error(resp, "Fetch Tasks")
- except Exception as e:
- logger.error(f"🔥 Fetch Tasks Exception: {e}")
- return []
- async def update_task_result(self, task_id: int, success: bool, current_attempts: int):
- url = f"{API_BASE_URL}/api/notification/outbox/update"
- params = {"id": task_id}
- new_attempts = current_attempts + 1
-
- if success:
- data = {"status": "sent", "attempts": new_attempts, "next_retry_at": None}
- else:
- if new_attempts >= MAX_ATTEMPTS:
- data = {"status": "failed", "attempts": new_attempts, "next_retry_at": None}
- else:
- next_retry = self._get_next_retry_time(new_attempts)
- data = {"status": "pending", "attempts": new_attempts, "next_retry_at": next_retry}
-
- try:
- async with self.session.post(url, params=params, json=data) as resp:
- if resp.status != 200:
- await self._log_response_error(resp, f"Update Status (ID: {task_id})")
- except Exception as e:
- logger.error(f"🔥 Update Result Exception (ID: {task_id}): {e}")
- # --- 渠道发送适配器 ---
- async def send_email(self, task_payload: Dict) -> bool:
- try:
- tid = task_payload.get("template_id")
- template_data = task_payload.get("payload", {})
- receivers = task_payload.get("receivers") or ([task_payload.get("receiver")] if task_payload.get("receiver") else [])
- if not receivers:
- logger.error(f"📧 [Email] No receivers found: {task_payload}")
- return False
-
- body = render_email(tid, template_data)
- sender, subject = get_email_meta(tid)
- is_bulk = len(receivers) > 1
- path = "/api/email-authorizations/sendmail-bulk" if is_bulk else "/api/email-authorizations/sendmail"
-
- params = {
- "emailAccount": sender,
- "sendTo": ",".join(receivers) if is_bulk else receivers[0],
- "subject": subject,
- "contentType": "html"
- }
-
- async with self.session.post(f"{API_BASE_URL}{path}", params=params, json={"body": body}) as resp:
- if resp.status == 200: return True
- await self._log_response_error(resp, "Email Dispatch")
- return False
- except Exception as e:
- logger.error(f"🔥 [Email] Exception: {str(e)}")
- return False
- async def send_wechat(self, task_payload: Dict) -> bool:
- try:
- content = render_wechat_markdown(task_payload.get("template_id"), task_payload.get("payload", {}))
- api_token = task_payload.get("api_token") or DEFAULT_WECHAT_TOKEN
- async with self.session.post(f"{API_BASE_URL}/api/wechat/send_markdown",
- json={"api_token": api_token, "message": content}) as resp:
- if resp.status == 200: return True
- await self._log_response_error(resp, "WeChat Dispatch")
- return False
- except Exception as e:
- logger.error(f"🔥 [WeChat] Exception: {str(e)}")
- return False
- async def send_whatsapp(self, task_payload: Dict) -> bool:
- try:
- content = render_whatsapp_text(task_payload.get("template_id"), task_payload.get("payload", {}))
- api_key = task_payload.get("api_key") or DEFAULT_WA_API_KEY
- recs = task_payload.get("receivers", [])
- success = True
- for r in recs:
- data = {"session": DEFAULT_WA_SESSION, "chat_id": r, "message": content, "api_key": api_key}
- async with self.session.post(f"{API_BASE_URL}/api/whatsapp/send", json=data) as resp:
- if resp.status != 200:
- await self._log_response_error(resp, f"WhatsApp to {r}")
- success = False
- return success
- except Exception as e:
- logger.error(f"🔥 [WhatsApp] Exception: {str(e)}")
- return False
- async def send_telegram(self, task_payload: Dict) -> bool:
- try:
- data = task_payload.get("payload", {})
- content = render_telegram_html(task_payload.get("template_id"), data)
- api_token = task_payload.get("api_token") or DEFAULT_TG_TOKEN
- chat_id = task_payload.get("chat_id") or data.get("chat_id")
- if not chat_id: return False
- async with self.session.post(f"{API_BASE_URL}/api/tg/send_message",
- json={"chat_id": str(chat_id), "api_token": api_token, "message": content}) as resp:
- if resp.status == 200: return True
- await self._log_response_error(resp, "Telegram Dispatch")
- return False
- except Exception as e:
- logger.error(f"🔥 [Telegram] Exception: {str(e)}")
- return False
- # --- 核心调度逻辑 ---
- async def process_task(self, item: Dict):
- # 1. 指数退避时间检查
- next_retry = item.get("next_retry_at")
- if next_retry:
- try:
- dt_str = next_retry.replace('Z', '+00:00')
- if datetime.fromisoformat(dt_str).replace(tzinfo=None) > datetime.utcnow():
- return
- except Exception: pass
- task_id = item["id"]
- channel = (item.get("channel") or "").lower()
- task_payload = item.get("payload", {})
- attempts = item.get("attempts", 0)
- # 2. 获取该频道的锁 (如果没有定义则动态创建一个)
- if channel not in self.channel_locks:
- self.channel_locks[channel] = asyncio.Lock()
-
- # 3. 在锁保护下执行发送逻辑(同频道串行,跨频道并行)
- async with self.channel_locks[channel]:
- logger.info(f"🚚 [Task {task_id}] Delivering {channel.upper()} (Attempt: {attempts})")
-
- success = False
- if channel == "email": success = await self.send_email(task_payload)
- elif channel == "wechat": success = await self.send_wechat(task_payload)
- elif channel == "whatsapp": success = await self.send_whatsapp(task_payload)
- elif channel in ["telegram", "tg"]: success = await self.send_telegram(task_payload)
- else:
- logger.warning(f"❓ [Task {task_id}] Unknown channel: {channel}")
- return
- await self.update_task_result(task_id, success, attempts)
-
- if success:
- logger.info(f"✨ [Task {task_id}] Sent Successfully.")
-
- # 可以在此处增加一个小间隔,避免请求过于密集
- await asyncio.sleep(0.5)
- async def run_loop(self):
- logger.info("🌈 Iris Messenger Engine is running with Channel-Locking...")
- while True:
- tasks = await self.fetch_pending_tasks()
- if tasks:
- # 依然使用 gather,但内部受 Lock 控制,达到“同类串行、异类并行”
- await asyncio.gather(*(self.process_task(t) for t in tasks))
- await asyncio.sleep(2)
- # ==========================================
- # 4. 运行入口
- # ==========================================
- async def main():
- async with IrisWorker() as worker:
- await worker.run_loop()
- if __name__ == "__main__":
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- logger.info("Iris stopped by user.")
- except Exception:
- logger.critical(f"Iris Engine Crashed: {traceback.format_exc()}")
|