iris.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import asyncio
  2. import aiohttp
  3. import logging
  4. import json
  5. import traceback
  6. from datetime import datetime, timedelta
  7. from typing import Dict, List, Any, Optional
  8. from notification_templates import *
  9. # ==========================================
  10. # 1. 配置信息
  11. # ==========================================
  12. API_BASE_URL = "http://localhost:8888"
  13. BEARER_TOKEN = "tok_e946329a60ff45ba807f3f41b0e8b7fc"
  14. MAX_ATTEMPTS = 5
  15. DEFAULT_TG_TOKEN = "6771183256:AAEd0Tenq4z6hk5toUGrCpEVPfP00bpYT1s"
  16. DEFAULT_WECHAT_TOKEN = "a8f79817-e18b-4739-8459-adb2ed5e2e32"
  17. DEFAULT_WA_API_KEY = "51fc877539064f5882fae0f6f0661123"
  18. DEFAULT_WA_SESSION = "default"
  19. logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
  20. logger = logging.getLogger("Iris")
  21. # ==========================================
  22. # 2. Iris 通知调度引擎
  23. # ==========================================
  24. class IrisWorker:
  25. def __init__(self):
  26. self.headers = {
  27. "Authorization": f"Bearer {BEARER_TOKEN}",
  28. "Accept": "application/json"
  29. }
  30. self.session: Optional[aiohttp.ClientSession] = None
  31. # 核心:频道锁字典,确保同类频道不并发
  32. self.channel_locks: Dict[str, asyncio.Lock] = {
  33. "email": asyncio.Lock(),
  34. "wechat": asyncio.Lock(),
  35. "whatsapp": asyncio.Lock(),
  36. "telegram": asyncio.Lock(),
  37. "tg": asyncio.Lock()
  38. }
  39. async def __aenter__(self):
  40. timeout = aiohttp.ClientTimeout(total=45) # 邮件发送可能较慢,稍微延长超时
  41. self.session = aiohttp.ClientSession(headers=self.headers, timeout=timeout)
  42. return self
  43. async def __aexit__(self, exc_type, exc_val, exc_tb):
  44. if self.session:
  45. await self.session.close()
  46. def _get_next_retry_time(self, attempts: int) -> str:
  47. wait_seconds = 2 ** attempts
  48. next_time = datetime.utcnow() + timedelta(seconds=wait_seconds)
  49. return next_time.isoformat(timespec='milliseconds') + 'Z'
  50. async def _log_response_error(self, resp: aiohttp.ClientResponse, context: str):
  51. try:
  52. body = await resp.text()
  53. logger.error(f"❌ {context} | Status: {resp.status} | Response: {body}")
  54. except Exception as e:
  55. logger.error(f"❌ {context} | Status: {resp.status} | (Error reading body: {e})")
  56. async def fetch_pending_tasks(self) -> List[Dict]:
  57. url = f"{API_BASE_URL}/api/notification/outbox/list"
  58. params = {"status": "pending", "size": 50}
  59. try:
  60. async with self.session.get(url, params=params) as resp:
  61. if resp.status == 200:
  62. result = await resp.json()
  63. return result["data"]["items"]
  64. await self._log_response_error(resp, "Fetch Tasks")
  65. except Exception as e:
  66. logger.error(f"🔥 Fetch Tasks Exception: {e}")
  67. return []
  68. async def update_task_result(self, task_id: int, success: bool, current_attempts: int):
  69. url = f"{API_BASE_URL}/api/notification/outbox/update"
  70. params = {"id": task_id}
  71. new_attempts = current_attempts + 1
  72. if success:
  73. data = {"status": "sent", "attempts": new_attempts, "next_retry_at": None}
  74. else:
  75. if new_attempts >= MAX_ATTEMPTS:
  76. data = {"status": "failed", "attempts": new_attempts, "next_retry_at": None}
  77. else:
  78. next_retry = self._get_next_retry_time(new_attempts)
  79. data = {"status": "pending", "attempts": new_attempts, "next_retry_at": next_retry}
  80. try:
  81. async with self.session.post(url, params=params, json=data) as resp:
  82. if resp.status != 200:
  83. await self._log_response_error(resp, f"Update Status (ID: {task_id})")
  84. except Exception as e:
  85. logger.error(f"🔥 Update Result Exception (ID: {task_id}): {e}")
  86. # --- 渠道发送适配器 ---
  87. async def send_email(self, task_payload: Dict) -> bool:
  88. try:
  89. tid = task_payload.get("template_id")
  90. template_data = task_payload.get("payload", {})
  91. receivers = task_payload.get("receivers") or ([task_payload.get("receiver")] if task_payload.get("receiver") else [])
  92. if not receivers:
  93. logger.error(f"📧 [Email] No receivers found: {task_payload}")
  94. return False
  95. body = render_email(tid, template_data)
  96. sender, subject = get_email_meta(tid)
  97. is_bulk = len(receivers) > 1
  98. path = "/api/email-authorizations/sendmail-bulk" if is_bulk else "/api/email-authorizations/sendmail"
  99. params = {
  100. "emailAccount": sender,
  101. "sendTo": ",".join(receivers) if is_bulk else receivers[0],
  102. "subject": subject,
  103. "contentType": "html"
  104. }
  105. async with self.session.post(f"{API_BASE_URL}{path}", params=params, json={"body": body}) as resp:
  106. if resp.status == 200: return True
  107. await self._log_response_error(resp, "Email Dispatch")
  108. return False
  109. except Exception as e:
  110. logger.error(f"🔥 [Email] Exception: {str(e)}")
  111. return False
  112. async def send_wechat(self, task_payload: Dict) -> bool:
  113. try:
  114. content = render_wechat_markdown(task_payload.get("template_id"), task_payload.get("payload", {}))
  115. api_token = task_payload.get("api_token") or DEFAULT_WECHAT_TOKEN
  116. async with self.session.post(f"{API_BASE_URL}/api/wechat/send_markdown",
  117. json={"api_token": api_token, "message": content}) as resp:
  118. if resp.status == 200: return True
  119. await self._log_response_error(resp, "WeChat Dispatch")
  120. return False
  121. except Exception as e:
  122. logger.error(f"🔥 [WeChat] Exception: {str(e)}")
  123. return False
  124. async def send_whatsapp(self, task_payload: Dict) -> bool:
  125. try:
  126. content = render_whatsapp_text(task_payload.get("template_id"), task_payload.get("payload", {}))
  127. api_key = task_payload.get("api_key") or DEFAULT_WA_API_KEY
  128. recs = task_payload.get("receivers", [])
  129. success = True
  130. for r in recs:
  131. data = {"session": DEFAULT_WA_SESSION, "chat_id": r, "message": content, "api_key": api_key}
  132. async with self.session.post(f"{API_BASE_URL}/api/whatsapp/send", json=data) as resp:
  133. if resp.status != 200:
  134. await self._log_response_error(resp, f"WhatsApp to {r}")
  135. success = False
  136. return success
  137. except Exception as e:
  138. logger.error(f"🔥 [WhatsApp] Exception: {str(e)}")
  139. return False
  140. async def send_telegram(self, task_payload: Dict) -> bool:
  141. try:
  142. data = task_payload.get("payload", {})
  143. content = render_telegram_html(task_payload.get("template_id"), data)
  144. api_token = task_payload.get("api_token") or DEFAULT_TG_TOKEN
  145. chat_id = task_payload.get("chat_id") or data.get("chat_id")
  146. if not chat_id: return False
  147. async with self.session.post(f"{API_BASE_URL}/api/tg/send_message",
  148. json={"chat_id": str(chat_id), "api_token": api_token, "message": content}) as resp:
  149. if resp.status == 200: return True
  150. await self._log_response_error(resp, "Telegram Dispatch")
  151. return False
  152. except Exception as e:
  153. logger.error(f"🔥 [Telegram] Exception: {str(e)}")
  154. return False
  155. # --- 核心调度逻辑 ---
  156. async def process_task(self, item: Dict):
  157. # 1. 指数退避时间检查
  158. next_retry = item.get("next_retry_at")
  159. if next_retry:
  160. try:
  161. dt_str = next_retry.replace('Z', '+00:00')
  162. if datetime.fromisoformat(dt_str).replace(tzinfo=None) > datetime.utcnow():
  163. return
  164. except Exception: pass
  165. task_id = item["id"]
  166. channel = (item.get("channel") or "").lower()
  167. task_payload = item.get("payload", {})
  168. attempts = item.get("attempts", 0)
  169. # 2. 获取该频道的锁 (如果没有定义则动态创建一个)
  170. if channel not in self.channel_locks:
  171. self.channel_locks[channel] = asyncio.Lock()
  172. # 3. 在锁保护下执行发送逻辑(同频道串行,跨频道并行)
  173. async with self.channel_locks[channel]:
  174. logger.info(f"🚚 [Task {task_id}] Delivering {channel.upper()} (Attempt: {attempts})")
  175. success = False
  176. if channel == "email": success = await self.send_email(task_payload)
  177. elif channel == "wechat": success = await self.send_wechat(task_payload)
  178. elif channel == "whatsapp": success = await self.send_whatsapp(task_payload)
  179. elif channel in ["telegram", "tg"]: success = await self.send_telegram(task_payload)
  180. else:
  181. logger.warning(f"❓ [Task {task_id}] Unknown channel: {channel}")
  182. return
  183. await self.update_task_result(task_id, success, attempts)
  184. if success:
  185. logger.info(f"✨ [Task {task_id}] Sent Successfully.")
  186. # 可以在此处增加一个小间隔,避免请求过于密集
  187. await asyncio.sleep(0.5)
  188. async def run_loop(self):
  189. logger.info("🌈 Iris Messenger Engine is running with Channel-Locking...")
  190. while True:
  191. tasks = await self.fetch_pending_tasks()
  192. if tasks:
  193. # 依然使用 gather,但内部受 Lock 控制,达到“同类串行、异类并行”
  194. await asyncio.gather(*(self.process_task(t) for t in tasks))
  195. await asyncio.sleep(2)
  196. # ==========================================
  197. # 4. 运行入口
  198. # ==========================================
  199. async def main():
  200. async with IrisWorker() as worker:
  201. await worker.run_loop()
  202. if __name__ == "__main__":
  203. try:
  204. asyncio.run(main())
  205. except KeyboardInterrupt:
  206. logger.info("Iris stopped by user.")
  207. except Exception:
  208. logger.critical(f"Iris Engine Crashed: {traceback.format_exc()}")