iris.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. import asyncio
  2. import aiohttp
  3. import logging
  4. import json
  5. import traceback
  6. from datetime import datetime, timedelta
  7. from typing import Dict, List, Any, Optional
  8. from notification_templates import *
  9. # ==========================================
  10. # 1. 配置信息
  11. # ==========================================
  12. API_BASE_URL = "https://visafly.top"
  13. BEARER_TOKEN = "tok_e946329a60ff45ba807f3f41b0e8b7fc"
  14. MAX_ATTEMPTS = 5
  15. logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
  16. logger = logging.getLogger("Iris")
  17. # ==========================================
  18. # 2. Iris 通知调度引擎
  19. # ==========================================
  20. class IrisWorker:
  21. def __init__(self):
  22. self.headers = {
  23. "Authorization": f"Bearer {BEARER_TOKEN}",
  24. "Accept": "application/json"
  25. }
  26. self.session: Optional[aiohttp.ClientSession] = None
  27. # 核心:频道锁字典,确保同类频道不并发
  28. self.channel_locks: Dict[str, asyncio.Lock] = {
  29. "email": asyncio.Lock(),
  30. "wechat": asyncio.Lock(),
  31. "whatsapp": asyncio.Lock(),
  32. "telegram": asyncio.Lock(),
  33. "tg": asyncio.Lock()
  34. }
  35. async def __aenter__(self):
  36. timeout = aiohttp.ClientTimeout(total=45) # 邮件发送可能较慢,稍微延长超时
  37. self.session = aiohttp.ClientSession(headers=self.headers, timeout=timeout)
  38. return self
  39. async def __aexit__(self, exc_type, exc_val, exc_tb):
  40. if self.session:
  41. await self.session.close()
  42. def _get_next_retry_time(self, attempts: int) -> str:
  43. wait_seconds = 2 ** attempts
  44. next_time = datetime.utcnow() + timedelta(seconds=wait_seconds)
  45. return next_time.isoformat(timespec='milliseconds') + 'Z'
  46. async def _log_response_error(self, resp: aiohttp.ClientResponse, context: str):
  47. try:
  48. body = await resp.text()
  49. logger.error(f"❌ {context} | Status: {resp.status} | Response: {body}")
  50. except Exception as e:
  51. logger.error(f"❌ {context} | Status: {resp.status} | (Error reading body: {e})")
  52. async def fetch_pending_tasks(self) -> List[Dict]:
  53. url = f"{API_BASE_URL}/api/notification/outbox/list"
  54. params = {"status": "pending", "size": 50}
  55. try:
  56. async with self.session.get(url, params=params) as resp:
  57. if resp.status == 200:
  58. result = await resp.json()
  59. return result["data"]["items"]
  60. await self._log_response_error(resp, "Fetch Tasks")
  61. except Exception as e:
  62. logger.error(f"🔥 Fetch Tasks Exception: {e}")
  63. return []
  64. async def update_task_result(self, task_id: int, success: bool, current_attempts: int):
  65. url = f"{API_BASE_URL}/api/notification/outbox/update"
  66. params = {"id": task_id}
  67. new_attempts = current_attempts + 1
  68. if success:
  69. data = {"status": "sent", "attempts": new_attempts, "next_retry_at": None}
  70. else:
  71. if new_attempts >= MAX_ATTEMPTS:
  72. data = {"status": "failed", "attempts": new_attempts, "next_retry_at": None}
  73. else:
  74. next_retry = self._get_next_retry_time(new_attempts)
  75. data = {"status": "pending", "attempts": new_attempts, "next_retry_at": next_retry}
  76. try:
  77. async with self.session.post(url, params=params, json=data) as resp:
  78. if resp.status != 200:
  79. await self._log_response_error(resp, f"Update Status (ID: {task_id})")
  80. except Exception as e:
  81. logger.error(f"🔥 Update Result Exception (ID: {task_id}): {e}")
  82. # --- 渠道发送适配器 ---
  83. async def send_email(self, task_payload: Dict) -> bool:
  84. try:
  85. tid = task_payload.get("template_id")
  86. template_data = task_payload.get("payload", {})
  87. receivers = task_payload.get("receivers") or ([task_payload.get("receiver")] if task_payload.get("receiver") else [])
  88. if not receivers:
  89. logger.error(f"📧 [Email] No receivers found: {task_payload}")
  90. return False
  91. body = render_email(tid, template_data)
  92. sender, subject = get_email_meta(tid)
  93. is_bulk = len(receivers) > 1
  94. path = "/api/email-authorizations/sendmail-bulk" if is_bulk else "/api/email-authorizations/sendmail"
  95. params = {
  96. "emailAccount": sender,
  97. "sendTo": ",".join(receivers) if is_bulk else receivers[0],
  98. "subject": subject,
  99. "contentType": "html"
  100. }
  101. async with self.session.post(f"{API_BASE_URL}{path}", params=params, json={"body": body}) as resp:
  102. if resp.status == 200: return True
  103. await self._log_response_error(resp, "Email Dispatch")
  104. return False
  105. except Exception as e:
  106. logger.error(f"🔥 [Email] Exception: {str(e)}")
  107. return False
  108. async def send_wechat(self, task_payload: Dict) -> bool:
  109. try:
  110. content = render_wechat_markdown(task_payload.get("template_id"), task_payload.get("payload", {}))
  111. async with self.session.post(f"{API_BASE_URL}/api/wechat/send_markdown_no_token",
  112. json={"message": content}) as resp:
  113. if resp.status == 200: return True
  114. await self._log_response_error(resp, "WeChat Dispatch")
  115. return False
  116. except Exception as e:
  117. logger.error(f"🔥 [WeChat] Exception: {str(e)}")
  118. return False
  119. async def send_whatsapp(self, task_payload: Dict) -> bool:
  120. try:
  121. content = render_whatsapp_text(task_payload.get("template_id"), task_payload.get("payload", {}))
  122. recs = task_payload.get("receivers", [])
  123. success = True
  124. for r in recs:
  125. data = {"chat_id": r, "message": content}
  126. async with self.session.post(f"{API_BASE_URL}/api/whatsapp/send_no_token", json=data) as resp:
  127. if resp.status != 200:
  128. await self._log_response_error(resp, f"WhatsApp to {r}")
  129. success = False
  130. return success
  131. except Exception as e:
  132. logger.error(f"🔥 [WhatsApp] Exception: {str(e)}")
  133. return False
  134. async def send_telegram(self, task_payload: Dict) -> bool:
  135. try:
  136. data = task_payload.get("payload", {})
  137. content = render_telegram_html(task_payload.get("template_id"), data)
  138. chat_id = task_payload.get("chat_id") or data.get("chat_id")
  139. if not chat_id: return False
  140. async with self.session.post(f"{API_BASE_URL}/api/tg/send_message_no_token",
  141. json={"chat_id": str(chat_id), "message": content}) as resp:
  142. if resp.status == 200: return True
  143. await self._log_response_error(resp, "Telegram Dispatch")
  144. return False
  145. except Exception as e:
  146. logger.error(f"🔥 [Telegram] Exception: {str(e)}")
  147. return False
  148. # --- 核心调度逻辑 ---
  149. async def process_task(self, item: Dict):
  150. # 1. 指数退避时间检查
  151. next_retry = item.get("next_retry_at")
  152. if next_retry:
  153. try:
  154. dt_str = next_retry.replace('Z', '+00:00')
  155. if datetime.fromisoformat(dt_str).replace(tzinfo=None) > datetime.utcnow():
  156. return
  157. except Exception: pass
  158. task_id = item["id"]
  159. channel = (item.get("channel") or "").lower()
  160. task_payload = item.get("payload", {})
  161. attempts = item.get("attempts", 0)
  162. # 2. 获取该频道的锁 (如果没有定义则动态创建一个)
  163. if channel not in self.channel_locks:
  164. self.channel_locks[channel] = asyncio.Lock()
  165. # 3. 在锁保护下执行发送逻辑(同频道串行,跨频道并行)
  166. async with self.channel_locks[channel]:
  167. logger.info(f"🚚 [Task {task_id}] Delivering {channel.upper()} (Attempt: {attempts})")
  168. success = False
  169. if channel == "email": success = await self.send_email(task_payload)
  170. elif channel == "wechat": success = await self.send_wechat(task_payload)
  171. elif channel == "whatsapp": success = await self.send_whatsapp(task_payload)
  172. elif channel in ["telegram", "tg"]: success = await self.send_telegram(task_payload)
  173. else:
  174. logger.warning(f"❓ [Task {task_id}] Unknown channel: {channel}")
  175. return
  176. await self.update_task_result(task_id, success, attempts)
  177. if success:
  178. logger.info(f"✨ [Task {task_id}] Sent Successfully.")
  179. # 可以在此处增加一个小间隔,避免请求过于密集
  180. await asyncio.sleep(0.5)
  181. async def run_loop(self):
  182. logger.info("🌈 Iris Messenger Engine is running with Channel-Locking...")
  183. while True:
  184. tasks = await self.fetch_pending_tasks()
  185. if tasks:
  186. # 依然使用 gather,但内部受 Lock 控制,达到“同类串行、异类并行”
  187. await asyncio.gather(*(self.process_task(t) for t in tasks))
  188. await asyncio.sleep(2)
  189. # ==========================================
  190. # 4. 运行入口
  191. # ==========================================
  192. async def main():
  193. async with IrisWorker() as worker:
  194. await worker.run_loop()
  195. if __name__ == "__main__":
  196. try:
  197. asyncio.run(main())
  198. except KeyboardInterrupt:
  199. logger.info("Iris stopped by user.")
  200. except Exception:
  201. logger.critical(f"Iris Engine Crashed: {traceback.format_exc()}")