import asyncio import aiohttp import logging import json import traceback from datetime import datetime, timedelta from typing import Dict, List, Any, Optional from notification_templates import * # ========================================== # 1. 配置信息 # ========================================== API_BASE_URL = "https://api.text.skin" BEARER_TOKEN = "tok_e946329a60ff45ba807f3f41b0e8b7fc" MAX_ATTEMPTS = 5 logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger("Iris") # ========================================== # 2. Iris 通知调度引擎 # ========================================== class IrisWorker: def __init__(self): self.headers = { "Authorization": f"Bearer {BEARER_TOKEN}", "Accept": "application/json" } self.session: Optional[aiohttp.ClientSession] = None # 核心:频道锁字典,确保同类频道不并发 self.channel_locks: Dict[str, asyncio.Lock] = { "email": asyncio.Lock(), "wechat": asyncio.Lock(), "whatsapp": asyncio.Lock(), "telegram": asyncio.Lock(), "tg": asyncio.Lock() } async def __aenter__(self): timeout = aiohttp.ClientTimeout(total=45) # 邮件发送可能较慢,稍微延长超时 self.session = aiohttp.ClientSession(headers=self.headers, timeout=timeout) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() def _get_next_retry_time(self, attempts: int) -> str: wait_seconds = 2 ** attempts next_time = datetime.utcnow() + timedelta(seconds=wait_seconds) return next_time.isoformat(timespec='milliseconds') + 'Z' async def _log_response_error(self, resp: aiohttp.ClientResponse, context: str): try: body = await resp.text() logger.error(f"❌ {context} | Status: {resp.status} | Response: {body}") except Exception as e: logger.error(f"❌ {context} | Status: {resp.status} | (Error reading body: {e})") async def fetch_pending_tasks(self) -> List[Dict]: url = f"{API_BASE_URL}/api/notification/outbox/list" params = {"status": "pending", "size": 5} try: async with self.session.get(url, params=params) as resp: if resp.status == 200: result = await resp.json() return result["data"]["items"] await self._log_response_error(resp, "Fetch Tasks") except Exception as e: logger.error(f"🔥 Fetch Tasks Exception: {e}") return [] async def update_task_result(self, task_id: int, success: bool, current_attempts: int): url = f"{API_BASE_URL}/api/notification/outbox/update" params = {"id": task_id} new_attempts = current_attempts + 1 if success: data = {"status": "sent", "attempts": new_attempts, "next_retry_at": None} else: if new_attempts >= MAX_ATTEMPTS: data = {"status": "failed", "attempts": new_attempts, "next_retry_at": None} else: next_retry = self._get_next_retry_time(new_attempts) data = {"status": "pending", "attempts": new_attempts, "next_retry_at": next_retry} try: async with self.session.post(url, params=params, json=data) as resp: if resp.status != 200: await self._log_response_error(resp, f"Update Status (ID: {task_id})") except Exception as e: logger.error(f"🔥 Update Result Exception (ID: {task_id}): {e}") # --- 渠道发送适配器 --- async def send_email(self, task_payload: Dict) -> bool: try: tid = task_payload.get("template_id") template_data = task_payload.get("payload", {}) receivers = task_payload.get("receivers") or ([task_payload.get("receiver")] if task_payload.get("receiver") else []) if not receivers: logger.error(f"📧 [Email] No receivers found: {task_payload}") return False body = render_email(tid, template_data) sender, subject = get_email_meta(tid) is_bulk = len(receivers) > 1 path = "/api/email-authorizations/sendmail-bulk" if is_bulk else "/api/email-authorizations/sendmail" params = { "emailAccount": sender, "sendTo": ",".join(receivers) if is_bulk else receivers[0], "subject": subject, "contentType": "html" } async with self.session.post(f"{API_BASE_URL}{path}", params=params, json={"body": body}) as resp: if resp.status == 200: return True await self._log_response_error(resp, "Email Dispatch") return False except Exception as e: logger.error(f"🔥 [Email] Exception: {str(e)}") return False async def send_wechat(self, task_payload: Dict) -> bool: try: content = render_wechat_markdown(task_payload.get("template_id"), task_payload.get("payload", {})) async with self.session.post(f"{API_BASE_URL}/api/wechat/send_markdown_no_token", json={"message": content}) as resp: if resp.status == 200: return True await self._log_response_error(resp, "WeChat Dispatch") return False except Exception as e: logger.error(f"🔥 [WeChat] Exception: {str(e)}") return False async def send_whatsapp(self, task_payload: Dict) -> bool: try: content = render_whatsapp_text(task_payload.get("template_id"), task_payload.get("payload", {})) recs = task_payload.get("receivers", []) success = True for r in recs: data = {"chat_id": r, "message": content} async with self.session.post(f"{API_BASE_URL}/api/whatsapp/send_no_token", json=data) as resp: if resp.status != 200: await self._log_response_error(resp, f"WhatsApp to {r}") success = False return success except Exception as e: logger.error(f"🔥 [WhatsApp] Exception: {str(e)}") return False async def send_telegram(self, task_payload: Dict) -> bool: try: data = task_payload.get("payload", {}) content = render_telegram_html(task_payload.get("template_id"), data) chat_id = task_payload.get("chat_id") or data.get("chat_id") if not chat_id: return False async with self.session.post(f"{API_BASE_URL}/api/tg/send_message_no_token", json={"chat_id": str(chat_id), "message": content}) as resp: if resp.status == 200: return True await self._log_response_error(resp, "Telegram Dispatch") return False except Exception as e: logger.error(f"🔥 [Telegram] Exception: {str(e)}") return False # --- 核心调度逻辑 --- async def process_task(self, item: Dict): # 1. 指数退避时间检查 next_retry = item.get("next_retry_at") if next_retry: try: dt_str = next_retry.replace('Z', '+00:00') if datetime.fromisoformat(dt_str).replace(tzinfo=None) > datetime.utcnow(): return except Exception: pass task_id = item["id"] channel = (item.get("channel") or "").lower() task_payload = item.get("payload", {}) attempts = item.get("attempts", 0) # 2. 获取该频道的锁 (如果没有定义则动态创建一个) if channel not in self.channel_locks: self.channel_locks[channel] = asyncio.Lock() # 3. 在锁保护下执行发送逻辑(同频道串行,跨频道并行) async with self.channel_locks[channel]: logger.info(f"🚚 [Task {task_id}] Delivering {channel.upper()} (Attempt: {attempts})") success = False if channel == "email": success = await self.send_email(task_payload) elif channel == "wechat": success = await self.send_wechat(task_payload) elif channel == "whatsapp": success = await self.send_whatsapp(task_payload) elif channel in ["telegram", "tg"]: success = await self.send_telegram(task_payload) else: logger.warning(f"❓ [Task {task_id}] Unknown channel: {channel}") return await self.update_task_result(task_id, success, attempts) if success: logger.info(f"✨ [Task {task_id}] Sent Successfully.") # 可以在此处增加一个小间隔,避免请求过于密集 await asyncio.sleep(0.5) async def run_loop(self): logger.info("🌈 Iris Messenger Engine is running with Channel-Locking...") while True: tasks = await self.fetch_pending_tasks() if tasks: # 依然使用 gather,但内部受 Lock 控制,达到“同类串行、异类并行” await asyncio.gather(*(self.process_task(t) for t in tasks)) await asyncio.sleep(3) # ========================================== # 4. 运行入口 # ========================================== async def main(): async with IrisWorker() as worker: await worker.run_loop() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Iris stopped by user.") except Exception: logger.critical(f"Iris Engine Crashed: {traceback.format_exc()}")