jerry 3 ヶ月 前
コミット
a69b11c861

+ 47 - 6
app/api/router.py

@@ -21,7 +21,7 @@ from app.models.schema import VasSchema
 from app.models.product import VasProduct
 from app.models.payment import VasPayment
 from app.schemas.common import ApiResponse, PageResponse
-from app.schemas.troov import TroovRate
+from app.schemas.troov import TroovRate, TroovCheckForbiddenInput, TroovProb
 from app.schemas.sms import ShortMessageDetail
 from app.schemas.configuration import ConfigurationCreate, ConfigurationUpdate, ConfigurationOut
 from app.schemas.email_authorizations import EmailContent, EmailAuthorizationCreate, EmailAuthorizationUpdate, EmailAuthorizationOut
@@ -50,7 +50,7 @@ from app.schemas.resource import FileUploadOut
 from app.schemas.statistics import VasStatisticsOverviewOut
 from app.schemas.llm import ParseUserInputsPayload, ParseUserInputsOut
 from app.services.configuration_service import ConfigurationService
-from app.services.troov_service import get_rate_by_date
+from app.services.troov_service import TroovService
 from app.services.sms_service import save_short_message, query_short_message
 from app.services.email_authorizations_service import EmailAuthorizationService
 from app.services.short_url_service import ShortUrlService
@@ -122,7 +122,47 @@ async def sms_download(
 async def troov_rate(date: str = Query(..., description="查询的日期, 格式: YYYY-MM-DD"),
                redis_client: Redis = Depends(get_redis_client)):
     # 调用 service 层获取数据
-    obj = await get_rate_by_date(redis_client, date)
+    obj = await TroovService.get_rate_by_date(redis_client, date)
+    return success(data=obj)
+
+@admin_required_router.post("/troov/book", summary="TROOV 查询ForbiddenUsers", tags=["通用接口"], response_model=ApiResponse)
+async def troov_check_forbiddenusers(
+    payload: TroovCheckForbiddenInput,
+    redis_client: Redis = Depends(get_redis_client)
+):
+    # 调用 service 层获取数据
+    obj = await TroovService.check_for_forbiddenusers(redis_client, payload)
+    return success(data=obj)
+
+@admin_required_router.get("/troov/list-probs", summary="TROOV 查询所有概率", tags=["通用接口"], response_model=ApiResponse[List[TroovProb]])
+async def troov_get_all_probs(
+    redis_client: Redis = Depends(get_redis_client)
+):
+    obj = await TroovService.get_all_probs(redis_client)
+    return success(data=obj)
+
+@admin_required_router.post("/troov/set-prob", summary="TROOV 修改概率", tags=["通用接口"], response_model=ApiResponse[List[TroovProb]])
+async def troov_add_prob(
+    payload: TroovProb,
+    redis_client: Redis = Depends(get_redis_client)
+):
+    obj = await TroovService.set_prob(redis_client, payload)
+    return success(data=obj)
+
+@admin_required_router.delete("/troov/del-prob", summary="TROOV 删除概率", tags=["通用接口"], response_model=ApiResponse[List[TroovProb]])
+async def troov_del_prob(
+    payload: TroovProb,
+    redis_client: Redis = Depends(get_redis_client)
+):
+    obj = await TroovService.del_prob(redis_client, payload)
+    return success(data=obj)
+
+@admin_required_router.post("/troov/reset-probs", summary="TROOV 重置概率", tags=["通用接口"], response_model=ApiResponse[List[TroovProb]])
+async def troov_del_prob(
+    date: str,
+    redis_client: Redis = Depends(get_redis_client)
+):
+    obj = await TroovService.reset_probs(redis_client, date)
     return success(data=obj)
 
 @admin_required_router.post("/dynamic-configurations", summary="创建动态配置", tags=["动态配置"], response_model=ApiResponse[ConfigurationOut])
@@ -476,12 +516,13 @@ async def slots_latest_get(
     res = await SlotSnapshotService.latest_for(db, country, city, visa_type)
     return success(data=res)
 
-@admin_required_router.get("/slots/report", summary="上报最近的slot", tags=["Slot数据"], response_model=ApiResponse[SlotSnapshotOut])
+@admin_required_router.post("/slots/report", summary="上报最近的slot", tags=["Slot数据"], response_model=ApiResponse[SlotSnapshotOut])
 async def slots_report(
     payload: SlotSnapshotCreate,
-    db: Session = Depends(get_db)
+    db: Session = Depends(get_db),
+    redis_client: Redis = Depends(get_redis_client)
 ):
-    res = await SlotSnapshotService.create(db, payload)
+    res = await SlotSnapshotService.report(db, redis_client, payload)
     return success(data=res)
 
 @public_router.post("/webhook/smshelper", summary="双开助手Webhook", tags=["webhook"], response_model=ApiResponse)

+ 4 - 5
app/main.py

@@ -29,11 +29,10 @@ async def startup():
     init_stripe()
     logger.info("🟢 Stripe config done")
     
-    # 通知服务启动
-    if os.environ.get("RUN_ON_MASTER", "1") == "1" and os.getppid() == 1:
-        redis_client = await get_redis_client()
-        asyncio.create_task(notification_consumer(AsyncSessionLocal, redis_client))
-        logger.info("🟢 Notification consumer started")    
+
+    redis_client = await get_redis_client()
+    asyncio.create_task(notification_consumer(AsyncSessionLocal, redis_client))
+    logger.info("🟢 Notification consumer started")    
 
 # -----------------------
 # Exception Handlers

+ 2 - 2
app/models/slot_snapshot.py

@@ -1,4 +1,4 @@
-from sqlalchemy import Column, Integer, String, DateTime, Enum, JSON
+from sqlalchemy import Column, Integer, String, DateTime, Enum, JSON, Text
 from datetime import datetime
 from app.core.database import Base
 
@@ -17,7 +17,7 @@ class VasSlotSnapshot(Base):
         Enum('None','Available','Waitlist'),
         nullable=False
     )
-
+    website = Column(Text, nullable=False)
     earliest_date = Column(DateTime)
     availability = Column(JSON, nullable=False)
 

+ 1 - 0
app/schemas/slot_snapshot.py

@@ -10,6 +10,7 @@ class SlotSnapshotBase(BaseModel):
     routing_key: str
     availability_status: str
     earliest_date: Optional[date] = None
+    website: Optional[str] = None
     availability: Any
     snapshot_source: str
     snapshot_at: datetime

+ 12 - 0
app/schemas/troov.py

@@ -1,7 +1,19 @@
 # app/schemas/troov.py
 from pydantic import BaseModel
+from datetime import datetime
+from typing import Optional
+
 
 class TroovRate(BaseModel):
     time: str
     rate: str
     capacity: int
+
+class TroovCheckForbiddenInput(BaseModel):
+    first_name: str
+    last_name: str
+    birthday: datetime
+    
+class TroovProb(BaseModel):
+    prob_key: datetime
+    prob_val: Optional[float] = None

+ 26 - 3
app/services/slot_snapshot_service.py

@@ -2,21 +2,44 @@
 
 from sqlalchemy.ext.asyncio import AsyncSession
 from sqlalchemy import select
+from redis.asyncio import Redis
 from app.models.slot_snapshot import VasSlotSnapshot
-from app.schemas.slot_snapshot import SlotSnapshotCreate
+from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
+from app.services.notification_service import NotificationService
 
 
 class SlotSnapshotService:
 
     @staticmethod
-    async def create(
+    async def report(
         db: AsyncSession,
+        redis_client: Redis,
         data: SlotSnapshotCreate
     ) -> VasSlotSnapshot:
         rec = VasSlotSnapshot(**data.dict())
         db.add(rec)
         await db.commit()
-        await db.refresh(rec)
+        await db.refresh(rec) 
+        # 修复点在这里:手动将 datetime 对象转为 string
+        earliest_date_str = rec.earliest_date.isoformat() if rec.earliest_date else None
+        snapshot_at_str = rec.snapshot_at.isoformat() if rec.snapshot_at else None
+
+        await NotificationService.post_wechat(
+            redis_client=redis_client,
+            template_id="slot_snapshot",
+            payload={
+                "country": rec.country,
+                "city": rec.city,
+                "visa_type": rec.visa_type,
+                "routing_key": rec.routing_key,
+                "availability_status": rec.availability_status,
+                # 使用转换后的字符串
+                "earliest_date": earliest_date_str,
+                "availability": rec.availability,
+                "snapshot_source": rec.snapshot_source,
+                "snapshot_at": snapshot_at_str
+            }
+        )
         return rec
 
     @staticmethod

+ 113 - 0
app/services/telegram_service.py

@@ -5,6 +5,70 @@ from app.core.biz_exception import BizLogicError
 from app.schemas.telegram import TelegramIn
 
 
+def _parse_slots_summary(availability_data):
+    """
+    availability:
+    [
+      { "date": "2025-01-05", "times": [] },
+      { "date": "2025-01-06", "times": [{"time": "09:30"}] }
+    ]
+    """
+    if not availability_data:
+        return "No specific slots data."
+    
+    if not isinstance(availability_data, list):
+        return str(availability_data)
+        
+    summaries = []
+    # 限制显示天数,防止消息过长
+    display_limit = 4 
+    
+    for day in availability_data[:display_limit]:
+        date_str = day.get("date")
+        
+        times = day.get("times", [])
+        
+        if not times:
+            # 修改点:即使没有具体时间,只要有日期条目,也显示为可用
+            summaries.append(f"{date_str}")
+        else:
+            # 有具体时间的处理
+            time_list = [t.get("time", "") for t in times[:5]]
+            time_text = ", ".join(filter(None, time_list)) # 过滤空时间
+            if len(times) > 5:
+                time_text += f" (+{len(times) - 5})"
+            summaries.append(f"{date_str}: {time_text}")
+            
+    if len(availability_data) > display_limit:
+        summaries.append(f"(+{len(availability_data) - display_limit} more days)")
+        
+    return " | ".join(summaries)
+
+
+def _get_display_meta(slot_snapshot):
+    """
+    Return dynamic header logic based on status.
+    Goal: Put the most important info in the notification preview.
+    """
+    date_str = slot_snapshot.get("earliest_date")
+    
+    if slot_snapshot.get("availability_status") == 'Available':
+        # Notification Preview: "🟢 UK London: 15 Jan 2024"
+        emoji = "🟢"
+        # If available, the DATE is the headline
+        headline = f'{slot_snapshot.get("country")}, {slot_snapshot.get("city")}: {date_str}'
+        color = "info" # Green for WeChat
+    elif slot_snapshot.get("availability_status") == 'Waitlist':
+        emoji = "🟡"
+        headline = f'Waitlist: {slot_snapshot.get("country")}, {slot_snapshot.get("city")}'
+        color = "warning" # Orange for WeChat
+    else:
+        emoji = "🔴"
+        headline = f'No Slots: {slot_snapshot.get("country")}, {slot_snapshot.get("city")}'
+        color = "comment" # Grey for WeChat
+        
+    return emoji, headline, color, date_str
+
 class TelegramService:
 
     @staticmethod
@@ -24,3 +88,52 @@ class TelegramService:
             raise BizLogicError(
                 f"Telegram push failed: {resp.status_code}, {resp.text}"
             )
+            
+    async def push_slot_snapshot(api_token: str, chat_id: str, slot_snapshot) -> str:
+        emoji, headline, _, date_str = _get_display_meta(slot_snapshot)        
+        # 解析详情
+        slots_summary = _parse_slots_summary(slot_snapshot.get("availability"))
+        
+        # 处理可能的 None 值
+        website = slot_snapshot.get("website") or "#"  # 如果没有网址,给一个空锚点
+        # 格式化更新时间
+        checked_at = slot_snapshot.get("snapshot_at", "")
+        if "T" in str(updated_at):
+             checked_at = str(checked_at).replace("T", " ")
+        
+        TEMPLATE = (
+            "{emoji} <b>{headline}</b>\n"
+            "──────────────────\n"
+            "🛂 <b>Visa:</b> {visa_type}\n"
+            "📅 <b>Earliest:</b> <code>{date_str}</code>\n"
+            "📊 <b>Slots:</b> {slots_summary}\n"
+            "──────────────────\n"
+            "🔗 <a href='{website}'><b>Book Now ➜</b></a>\n\n"
+            "🕒 <i>Checked at {checked_at}</i>"
+        )
+
+        content = TEMPLATE.format_map({
+            "emoji": emoji,
+            "headline": headline,
+            "visa_type": slot_snapshot.get("visa_type", "N/A"),
+            "date_str": date_str,
+            "slots_summary": slots_summary,
+            "website": website,
+            "checked_at": checked_at,
+        })
+        
+        url = f"https://api.telegram.org/bot{api_token}/sendMessage"
+
+        body = {
+            "chat_id": chat_id,
+            "text": content,
+            "parse_mode": "HTML",
+        }
+
+        async with httpx.AsyncClient(timeout=10) as client:
+            resp = await client.post(url, json=body)
+
+        if resp.status_code != 200:
+            raise BizLogicError(
+                f"Telegram push failed: {resp.status_code}, {resp.text}"
+            )

+ 296 - 152
app/services/troov_service.py

@@ -3,184 +3,328 @@ import time
 import random
 import asyncio
 import aiohttp
+from datetime import datetime, timedelta
 from typing import List, Optional, Tuple, Dict, Any
 
 from redis.asyncio import Redis
 from starlette.concurrency import run_in_threadpool
-from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
+from app.core.biz_exception import NotFoundError, BizLogicError
 
-from app.schemas.troov import TroovRate
+from app.schemas.troov import TroovRate, TroovProb, TroovCheckForbiddenInput
 from app.utils.france_slot_api import troov_create_session_old
 from app.utils.proxy_utils import load_proxies_from_json
 from app.core.logger import logger
 
+class TroovService:
+    """
+    Troov 业务逻辑工具类 (Pure Static)
+    """
+    
+    # Redis Lua Script for atomic token retrieval
+    POP_TOKEN_LUA = """
+    local cursor = "0"
+    local max_ttl = -1
+    local max_key = nil
+    repeat
+        local result = redis.call('SCAN', cursor, 'MATCH', 'token:*', 'COUNT', 50)
+        cursor = result[1]
+        local keys = result[2]
+        for _, key in ipairs(keys) do
+            local ttl = redis.call('TTL', key)
+            if ttl > max_ttl then max_ttl = ttl; max_key = key end
+        end
+    until cursor == "0"
+    if max_key then
+        local value = redis.call('GET', max_key)
+        redis.call('DEL', max_key)
+        return {max_key, value, max_ttl}
+    end
+    return nil
+    """
+
+    # Static configuration data for payload (Moved from function to class constant)
+    _ZONE_DATA = {
+        "name": 'Visas',
+        "name_traduction": {"fr": 'Visas', "en": "", "zh": "", "ar": "", "ru": "", "it": "", "es": "", "de": "", "pt": ""},
+        "enable_external_url": False,
+        "external_url": "",
+        "has_paid_reservation": False,
+        "openings": [
+            {"day": 1, "begin_h": 9, "begin_m": 0, "end_h": 13, "end_m": 0, "_id": "65b969289175b1f087bdf357"},
+            {"day": 2, "begin_h": 9, "begin_m": 0, "end_h": 13, "end_m": 0, "_id": "65b969289175b1f087bdf358"},
+            {"day": 3, "begin_h": 9, "begin_m": 0, "end_h": 13, "end_m": 0, "_id": "65b969289175b1f087bdf359"},
+            {"day": 4, "begin_h": 9, "begin_m": 0, "end_h": 13, "end_m": 0, "_id": "65b969289175b1f087bdf35a"},
+            {"day": 5, "begin_h": 9, "begin_m": 0, "end_h": 13, "end_m": 0, "_id": "65b969289175b1f087bdf35b"}
+        ],
+        "custom_openings": [],
+        "breaktimes": [
+            [
+                {"day": 1, "begin_h": 12, "begin_m": 0, "end_h": 14, "end_m": 0, "_id": "65e1bb30ec8f214f6a5af678"},
+                {"day": 2, "begin_h": 12, "begin_m": 0, "end_h": 14, "end_m": 0, "_id": "65e1bb30ec8f214f6a5af679"},
+                {"day": 3, "begin_h": 12, "begin_m": 0, "end_h": 14, "end_m": 0, "_id": "65e1bb30ec8f214f6a5af67a"},
+                {"day": 4, "begin_h": 12, "begin_m": 0, "end_h": 14, "end_m": 0, "_id": "65e1bb30ec8f214f6a5af67b"},
+                {"day": 5, "begin_h": 12, "begin_m": 0, "end_h": 14, "end_m": 0, "_id": "65e1bb30ec8f214f6a5af67c"}
+            ],
+            []
+        ],
+        "session_duration": 15, "session_type": "people", "session_reservation_max": 1000,
+        "session_people_max": 1, "reservation_people_max": 1, "is_priority": True,
+        "reservation_delay_hours": 0, "start_opening": "2022-03-31", "end_opening": "2025-12-31",
+        "is_open": True, "is_open_internal": True, "stand_alone_calendar": False,
+        "note": {"ar": "", "de": "", "en": "", "es": "", "fr": "", "it": "", "nl": "", "pt": "", "ru": "", "zh": ""},
+        "dynamic_calendar_enabled": True,
+        "dynamic_calendar_ending": {"hour": "default", "minute": "default"},
+        "external_link_for_documents": "https://france-visas.gouv.fr/en/web/france-visas/online-application",
+        "dynamic_calendar": {"begin": {"type": "days"}, "end": {"type": "days", "value": 7}},
+        "closed_days": [
+            "2024-01-01", "2024-02-05", "2024-03-18", "2024-03-22", "2024-03-23", "2024-03-24", "2024-03-25",
+            "2024-03-26", "2024-03-27", "2024-03-28", "2024-03-29", "2024-04-01", "2024-05-01", "2024-05-06",
+            "2024-05-09", "2024-06-03", "2024-08-05", "2024-10-28", "2024-12-25", "2024-12-26", "2025-01-01",
+            "2025-02-03", "2025-03-17", "2025-04-21", "2025-05-05", "2025-06-02", "2025-08-04", "2025-08-15",
+            "2025-10-27", "2025-12-25", "2025-12-26"
+        ],
+        "custom_fields": [], "service_color": "#e91e63", "enable_repeat_form": False,
+        "enable_fullday_slots": False, "session_price": 0, "cancel_limit": {"value": 0, "type": "days"},
+        "activate_waiting_list": False, "deactivate_reservation_cancelation": True,
+        "_id": '624317926863643fe83c8548'
+    }
+    
+    _probability_model = 'probability_model'
 
-POP_TOKEN_LUA = """
-local cursor = "0"
-local max_ttl = -1
-local max_key = nil
+    _time_slots_am = [
+        "09:30", "09:45", "10:00", "10:15", "10:30",
+        "10:45", "11:00", "11:15", "11:30", "11:45"
+    ]
+    _time_slots_pm = ["14:00", "14:15", "14:30", "14:45", "15:00"]
+    # =========================================================
+    # Public Methods
+    # =========================================================
 
-repeat
-    local result = redis.call('SCAN', cursor, 'MATCH', 'token:*', 'COUNT', 50)
-    cursor = result[1]
-    local keys = result[2]
+    @staticmethod
+    async def check_for_forbiddenusers(redis_client: Redis, payload: TroovCheckForbiddenInput) -> Dict[str, Any]:
+        """检测用户是否被禁止"""
+        current_proxy, session_dic = await TroovService._prepare_session_context(redis_client)
+        booking_token = await TroovService._get_valid_token(redis_client)
+        if not booking_token:
+            raise NotFoundError(message="Failed to retrieve second captcha token for booking")
 
-    for _, key in ipairs(keys) do
-        local ttl = redis.call('TTL', key)
-        if ttl > max_ttl then
-            max_ttl = ttl
-            max_key = key
-        end
-    end
-until cursor == "0"
+        # Inline logic: Calculate next Monday
+        today = datetime.today()
+        days_ahead = 7 - today.weekday()
+        if days_ahead == 0: days_ahead = 7
+        date = (today + timedelta(days=days_ahead)).strftime("%Y-%m-%d")
 
-if max_key then
-    local value = redis.call('GET', max_key)
-    redis.call('DEL', max_key)
-    return {max_key, value, max_ttl}
-end
+        slot = {'time': '09:30', 'rate': '0.00', 'capacity': 1}
+        book_uinfo = {
+            "id": 0,
+            "birth_date": payload.birthday.strftime("%m/%d/%Y"),
+            "email": 'arket_zz@163.com',
+            "phone": '+3530829394212',
+            "first_name": payload.first_name,
+            "last_name": payload.last_name,
+        }
 
-return nil
-"""
+        # Build Payload inline
+        book_body = TroovService._build_book_payload(session_dic['session_id'], date, slot, book_uinfo, booking_token)
 
-async def get_valid_token_from_redis(redis_client: Redis, timeout: int = 30) -> Optional[str]:
-    """
-    尝试从 Redis 获取有效的验证码 Token。
-    包含重试机制。
-    """
-    start_time = time.time()
-    
-    while time.time() - start_time < timeout:
-        # 执行 Lua 脚本原子获取
-        result = await redis_client.eval(POP_TOKEN_LUA, 0)
+        # Exec Request
+        url = f"https://51.254.177.49/api/team/{session_dic['embassy']['teamId']}/reservations/family"
+        headers = {
+            'accept': 'application/json, text/plain, */*',
+            'content-type': 'application/json',
+            'origin': 'https://consulat.gouv.fr',
+            'referer': session_dic['embassy']['website'],
+            "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36",
+            'x-csrf-token': session_dic['x-csrf-token'],
+            'x-gouv-app-id': session_dic['x_gouv_app_id'],
+            'x-gouv-web': 'fr.gouv.consulat',
+        }
         
-        if result:
-            try:
-                # result 结构: [key, value_str, ttl]
-                body_str = result[1]
-                body = json.loads(body_str)
-                token = body.get("token")
-                if token:
-                    return token
-            except (json.JSONDecodeError, IndexError, AttributeError):
-                logger.warning("Redis retrieved invalid token format")
-        
-        # 没拿到或格式不对,稍作等待
-        await asyncio.sleep(1)
-    
-    return None
+        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15), connector=aiohttp.TCPConnector(ssl=False)) as session:
+            async with session.post(url, headers=headers, data=json.dumps(book_body), proxy=current_proxy) as resp:
+                return json.loads(await resp.text())
 
+    @staticmethod
+    async def get_rate_by_date(redis_client: Redis, date: str) -> List[TroovRate]:
+        """根据日期获取预约可用性"""
+        current_proxy, session_dic = await TroovService._prepare_session_context(redis_client)
 
-# =========================================================
-# 2. 网络请求模块
-# =========================================================
+        url = "https://51.254.177.49/api/team/621540d353069dec25bd0045/reservations/availability"
+        params = {
+            "name": "Visas", "date": date, "places": "-5", "matching": "", 
+            "maxCapacity": "-5", "sessionId": session_dic.get("session_id")
+        }
+        headers = {
+            "accept": "application/json, text/plain, */*",
+            "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
+            "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36",
+            "x-gouv-app-id": session_dic.get("x_gouv_app_id"),
+            "x-gouv-web": "fr.gouv.consulat",
+        }
 
-async def fetch_troov_availability(
-    session_data: Dict[str, Any], 
-    date: str, 
-    proxy_url: str
-) -> str:
-    """
-    请求 Troov 预约可用性接口。
-    强制使用指定的代理。
-    """
-    url = (
-        "https://51.254.177.49/api/team/"
-        "621540d353069dec25bd0045/reservations/availability"
-    )
-    
-    # URL 参数
-    params = {
-        "name": "Visas",
-        "date": date,
-        "places": "-5",
-        "matching": "",
-        "maxCapacity": "-5",
-        "sessionId": session_data.get("session_id")
-    }
+        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15), connector=aiohttp.TCPConnector(ssl=False)) as session:
+            async with session.get(url, params=params, headers=headers, proxy=current_proxy) as resp:
+                resp.raise_for_status()
+                return json.loads(await resp.text())
 
-    headers = {
-        "accept": "application/json, text/plain, */*",
-        "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
-        # "origin": "https://consulat.gouv.fr",
-        # "referer": "https://consulat.gouv.fr/en/ambassade-de-france-en-irlande/appointment?name=Visas",
-        "user-agent": (
-            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
-            "AppleWebKit/537.36 (KHTML, like Gecko) "
-            "Chrome/141.0.0.0 Safari/537.36"
-        ),
-        "x-gouv-app-id": session_data.get("x_gouv_app_id"),
-        "x-gouv-web": "fr.gouv.consulat",
-    }
+    # =========================================================
+    # Internal Logic
+    # =========================================================
 
-    timeout = aiohttp.ClientTimeout(total=15)
-    
-    connector = aiohttp.TCPConnector(ssl=False)
-
-    # 显式使用传入的 proxy_url
-    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
-        async with session.get(
-            url, 
-            params=params, 
-            headers=headers, 
-            proxy=proxy_url
-        ) as resp:
-            resp.raise_for_status() # 如果状态码不是 200,抛出异常
-            return await resp.text()
-
-
-# =========================================================
-# 3. 核心业务流程
-# =========================================================
-
-def _get_proxy_pool() -> List[str]:
-    """加载代理池配置"""
-    proxies = []
-    # 可以在此处扩展更多 pool 类型
-    for pool in ("oxylabs",):
-        proxies.extend(load_proxies_from_json("data/proxy_pool_config.json", pool))
-    return proxies
-
-
-async def get_rate_by_date(
-    redis_client: Redis,
-    date: str
-) -> Optional[List[TroovRate]]:
-    """
-    主入口:根据日期获取 Troov 预约信息
-    流程:获取代理 -> 获取 Token -> 创建会话(Sync) -> 获取数据(Async)
-    """
+    @staticmethod
+    async def _prepare_session_context(redis_client: Redis) -> Tuple[str, Dict[str, Any]]:
+        """获取代理 -> 获取Token -> 创建Session"""
+        # Inline proxy loading
+        proxies = []
+        for pool in ("oxylabs",):
+            proxies.extend(load_proxies_from_json("data/proxy_pool_config.json", pool))
+        
+        if not proxies:
+            raise NotFoundError(message="Proxy pool is empty")
+        current_proxy = random.choice(proxies)
+
+        captcha_token = await TroovService._get_valid_token(redis_client)
+        if not captcha_token:
+            raise NotFoundError(message="Failed to retrieve captcha token")
 
-    # 1. 准备代理
-    proxies = _get_proxy_pool()
-    if not proxies:
-        raise NotFoundError(message="Proxy pool is empty")
+        logger.info(f"Creating session with proxy: {current_proxy}...")
+        session_dic = await run_in_threadpool(troov_create_session_old, current_proxy, captcha_token)
+
+        if not session_dic:
+            raise BizLogicError(message="Failed to create Troov session")
+        
+        logger.info(f"Troov session created: {session_dic.get('session_id')}")
+        return current_proxy, session_dic
     
-    # 随机选择一个代理,并在整个流程中保持一致
-    current_proxy = random.choice(proxies)
+    @staticmethod
+    async def get_all_probs(redis_client: Redis) -> List[TroovProb]:
+        prob_map = await redis_client.hgetall(TroovService._probability_model)
+        res = []
+        for k, v in prob_map.items():
+            # redis 返回 bytes
+            key_str = k.decode() if isinstance(k, (bytes, bytearray)) else k
+            val_str = v.decode() if isinstance(v, (bytes, bytearray)) else v
+            res.append(
+                TroovProb(
+                    prob_key=datetime.strptime(key_str, "%Y-%m-%d.%H:%M"),
+                    prob_val=float(val_str) if val_str is not None else None,
+                )
+            )
+        # 按时间排序(强烈建议)
+        res.sort(key=lambda x: x.prob_key)
+        return res
 
-    # 2. 获取验证码 Token
-    captcha_token = await get_valid_token_from_redis(redis_client)
-    if not captcha_token:
-        raise NotFoundError(message="Failed to retrieve captcha token within timeout")
+    @staticmethod
+    async def set_prob(redis_client: Redis, payload: TroovProb):
+        """更新 / 新增概率"""
+        slot_key = payload.prob_key.strftime("%Y-%m-%d.%H:%M")
+        prob_val = payload.prob_val
+        if prob_val is not None:
+            prob_val = max(0.0, min(1.0, prob_val))
+        await redis_client.hset(
+            TroovService._probability_model,
+            slot_key,
+            prob_val,
+        )
+        return await TroovService.get_all_probs(redis_client)
 
+    @staticmethod
+    async def del_prob(redis_client: Redis, payload: TroovProb):
+        """移除单个 slot 概率"""
+        slot_key = payload.prob_key.strftime("%Y-%m-%d.%H:%M")
+        await redis_client.hdel(
+            TroovService._probability_model,
+            slot_key,
+        )
+        return await TroovService.get_all_probs(redis_client)
 
-    logger.info(f"Creating session with proxy: {current_proxy}...")
-    session_dic = await run_in_threadpool(
-        troov_create_session_old, 
-        current_proxy, 
-        captcha_token
-    )
+    @staticmethod
+    async def reset_probs(redis_client: Redis, date: str):
+        """
+        重置某一天的所有时间槽概率
+        周六周日不生成
+        周五只有上午
+        """
+        await redis_client.delete(TroovService._probability_model)
+        date_dt = datetime.strptime(date, "%Y-%m-%d")
+        # 周六、周日
+        if date_dt.weekday() in (5, 6):
+            return []
+        if date_dt.weekday() == 4:  # 周五
+            timeslots = TroovService._time_slots_am
+        else:
+            timeslots = TroovService._time_slots_am + TroovService._time_slots_pm
+        mapping = {
+            f"{date}.{time_slot}": 0.5
+            for time_slot in timeslots
+        }
+        if mapping:
+            await redis_client.hset(
+                TroovService._probability_model,
+                mapping=mapping,
+            )
+        return await TroovService.get_all_probs(redis_client)
     
-    if not session_dic:
-        raise BizLogicError(message="Failed to create Troov session (session_dic is empty)")
+
+    @staticmethod
+    async def _get_valid_token(redis_client: Redis, timeout: int = 30) -> Optional[str]:
+        start_time = time.time()
+        while time.time() - start_time < timeout:
+            result = await redis_client.eval(TroovService.POP_TOKEN_LUA, 0)
+            if result:
+                try:
+                    return json.loads(result[1]).get("token")
+                except:
+                    logger.warning("Invalid token format in Redis")
+            await asyncio.sleep(1)
+        return None
+
+    @staticmethod
+    def _build_book_payload(sid, date, slot, uinfo, token):
+        """构造预约 Payload (原 troov_dublin_visas_book_data_builder,逻辑已内联)"""
+        # Parse Dates Inline
+        birth_parts = uinfo['birth_date'].split('/')
+        dt_obj = datetime.strptime(date, "%Y-%m-%d")
+        slot_h, slot_m = slot['time'].split(":")
         
-    logger.info(f"Troov session created successfully: {session_dic.get('session_id')}")
+        # Formatter logic inline
+        fmt_date_full = dt_obj.strftime("%Y-%m-%dT%H:%M:%S.000Z")
+        fmt_tz = datetime.strptime(slot['time'], "%H:%M").strftime("-%H:%M")
+        formatted_slot_datetime = f"{fmt_date_full}{fmt_tz}"
+        slot_value_str = f"slot-visas-{formatted_slot_datetime}"
+        
+        # Build Slot Object Inline
+        slot_obj = {
+            'time': slot['time'], 'rate': slot['rate'], 'capacity': slot['capacity'],
+            'numberOfApplicants': 1,
+            "date": f"{date}T{slot['time']}:00",
+            "localDateString": dt_obj.strftime("%m/%d/%Y"),
+            "dateObject": {
+                "year": dt_obj.year, "month": dt_obj.month - 1, "day": dt_obj.day,
+                "hour": slot_h, "minute": slot_m,
+            },
+            "id": 0, "slotValue": slot_value_str
+        }
 
-    # 确保这里传入了 current_proxy
-    response_text = await fetch_troov_availability(session_dic, date, current_proxy)
-    
-    # 解析数据
-    data = json.loads(response_text)
-    # 这里可以加一步数据校验,确保 data 是 List[TroovRate] 格式
-    return data
+        return {
+            "reservations": {
+                "mainUser": {
+                    "lastname": uinfo['last_name'], "firstname": uinfo['first_name'],
+                    "email": uinfo['email'], "mobile": uinfo['phone'],
+                    "birthdate": {"month": int(birth_parts[0]) - 1, "day": int(birth_parts[1]), "year": int(birth_parts[2])},
+                    "slots": {},
+                    "services": [
+                        {
+                            "zone": TroovService._ZONE_DATA,
+                            "zone_id": '624317926863643fe83c8548',
+                            "external_link_for_documents": "https://france-visas.gouv.fr/en/web/france-visas/online-application",
+                            "label": 'Visas', "name": 'Visas', "numberOfSlots": 1, "maxSlots": 5,
+                            "checkboxesSlots": [slot_value_str], "customFields": [], "customFieldsAreValid": True,
+                            "slots": [slot_obj], "slotsToKeep": [slot_obj]
+                        }
+                    ]
+                },
+                "secondaryUsers": [], "sessionId": sid, "team": "621540d353069dec25bd0045"
+            },
+            "language": "en", "captcha": token, "sessionId": sid
+        }

+ 0 - 1
app/services/webhook_service.py

@@ -35,7 +35,6 @@ class WebhookService:
         )
         result = await db.execute(stmt)
         routings = result.scalars().all()
-        print(f'routings = {routings}')
         if not routings:
             return []
 

+ 103 - 47
app/services/wechat_service.py

@@ -1,21 +1,86 @@
 import httpx
-from typing import Dict, Any
+from typing import Dict, Any, List, Union
+from datetime import datetime
 from app.core.biz_exception import BizLogicError
 from app.schemas.wechat import WechatIn
 
+
+def _parse_slots_summary(availability_data):
+    """
+    availability:
+    [
+      { "date": "2025-01-05", "times": [] },
+      { "date": "2025-01-06", "times": [{"time": "09:30"}] }
+    ]
+    """
+    if not availability_data:
+        return "No specific slots data."
+    
+    if not isinstance(availability_data, list):
+        return str(availability_data)
+        
+    summaries = []
+    # 限制显示天数,防止消息过长
+    display_limit = 4 
+    
+    for day in availability_data[:display_limit]:
+        date_str = day.get("date")
+        
+        times = day.get("times", [])
+        
+        if not times:
+            # 修改点:即使没有具体时间,只要有日期条目,也显示为可用
+            summaries.append(f"{date_str}")
+        else:
+            # 有具体时间的处理
+            time_list = [t.get("time", "") for t in times[:5]]
+            time_text = ", ".join(filter(None, time_list)) # 过滤空时间
+            if len(times) > 5:
+                time_text += f" (+{len(times) - 5})"
+            summaries.append(f"{date_str}: {time_text}")
+            
+    if len(availability_data) > display_limit:
+        summaries.append(f"(+{len(availability_data) - display_limit} more days)")
+        
+    return " | ".join(summaries)
+
+
+def _get_display_meta(slot_snapshot):
+    """
+    Return dynamic header logic based on status.
+    """
+    # 修复:确保这里也调用增强后的日期格式化
+    date_str = slot_snapshot.get("earliest_date")
+    
+    status = slot_snapshot.get("availability_status")
+    country = slot_snapshot.get("country", "Unknown")
+    city = slot_snapshot.get("city", "Unknown")
+
+    if status == 'Available':
+        emoji = "🟢"
+        headline = f'{country}, {city}: {date_str}'
+        color = "info" # Green
+    elif status == 'Waitlist':
+        emoji = "🟡"
+        headline = f'Waitlist: {country}, {city}'
+        color = "warning" # Orange
+    else:
+        emoji = "🔴"
+        headline = f'No Slots: {country}, {city}'
+        color = "comment" # Grey
+        
+    return emoji, headline, color, date_str
+
 class WechatService:
     
     @staticmethod
     async def _send_webhook(api_token: str, payload: Dict[str, Any]):
-        """
-        内部私有方法:发送 HTTP 请求到企业微信 Webhook
-        """
+        """内部私有方法:发送 HTTP 请求到企业微信 Webhook"""
         url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}"
 
         try:
             async with httpx.AsyncClient(timeout=10) as client:
                 response = await client.post(url, json=payload)
-
         except httpx.RequestError as e:
             raise BizLogicError(f"Wechat push request error: {e}")
 
@@ -29,25 +94,9 @@ class WechatService:
             )
         return True
 
-    @staticmethod
-    async def push_to_wechat(payload: WechatIn):
-        """
-        [保留原函数兼容] 发送纯文本消息
-        """
-        body = {
-            "msgtype": "text",
-            "text": {
-                "content": payload.message
-                # "mentioned_mobile_list":["13800006789","@all"] # 可选:如需艾特所有人
-            }
-        }
-        return await WechatService._send_webhook(payload.api_token, body)
-
     @staticmethod
     async def push_markdown(api_token: str, content: str):
-        """
-        发送 Markdown 消息 (支持标题、加粗、链接、颜色)
-        """
+        """发送 Markdown 消息"""
         body = {
             "msgtype": "markdown",
             "markdown": {
@@ -57,33 +106,40 @@ class WechatService:
         return await WechatService._send_webhook(api_token, body)
 
     @staticmethod
-    async def push_payment_template(api_token: str, data: Dict[str, Any]):
-        """
-        专门用于发送【支付确认】的模板消息
-        适配之前的业务逻辑,将其转换为 Webhook 支持的 Markdown 格式
+    async def push_slot_snapshot(api_token: str, slot_snapshot: Dict[str, Any]):
+        # 获取元数据
+        emoji, headline, color, date_str = _get_display_meta(slot_snapshot)
         
-        Args:
-            api_token: Webhook Key
-            data: 包含 order_id, amount, currency, user_email, confirm_url 等字段的字典
-        """
+        # 解析详情
+        slots_summary = _parse_slots_summary(slot_snapshot.get("availability"))
         
-        # 1. 格式化金额
-        amount_fen = data.get('amount', 0)
-        amount_str = f"{amount_fen / 100:,.2f}"
-        currency = data.get('currency', 'CNY')
-        
-        # 2. 构造 Markdown 内容
-        # <font color="warning"> 红色/橙色
-        # <font color="info"> 绿色
-        # <font color="comment"> 灰色
-        markdown_content = f"""**💰 新增待确认支付**
-> 订单号:<font color="comment">{data.get('order_id', 'N/A')}</font>
-> 用户:<font color="comment">{data.get('user_email', 'Unknown')}</font>
-> 金额:<font color="warning">{amount_str} {currency}</font>
-> 渠道:{data.get('provider', 'Manual')}
-> 时间:{data.get('time_str', '')}
+        # 处理可能的 None 值
+        website = slot_snapshot.get("website") or "#"  # 如果没有网址,给一个空锚点
+        # 格式化更新时间
+        updated_at = slot_snapshot.get("snapshot_at", "")
+        if "T" in str(updated_at):
+             updated_at = str(updated_at).replace("T", " ")
 
-请核实资金到账情况。
-👉 [点击此处进行系统确认]({data.get('confirm_url')})"""
+        TEMPLATE = (
+            "# {emoji} {headline}\n"
+            "> **Visa Type**: <font color=\"comment\">{visa_type}</font>\n"
+            "> **Earliest**: <font color=\"{color}\">{date_str}</font>\n"
+            "> **Details**: <font color=\"comment\">{slots_summary}</font>\n"
+            "\n"
+            "👉 [Tap to Book Appointment]({website})\n"
+            "\n"
+            "<font color=\"comment\">Updated: {updated_at}</font>"
+        )
 
+        markdown_content = TEMPLATE.format_map({
+            "emoji": emoji,
+            "headline": headline,
+            "visa_type": slot_snapshot.get("visa_type", "N/A"),
+            "color": color,
+            "date_str": date_str,
+            "slots_summary": slots_summary,
+            "website": website,
+            "updated_at": updated_at,
+        })
+        
         return await WechatService.push_markdown(api_token, markdown_content)

+ 3 - 96
app/tasks/notification_task.py

@@ -59,6 +59,8 @@ async def notification_consumer(session_factory, redis_client: Redis):
                 if "payment_user_confirmed" == template_id:
                     status = await WechatService.push_payment_template(api_token, payload)
                     print(f"Wechat send status: {status}")
+                if "slot_snapshot" == template_id:
+                    status = await WechatService.push_slot_snapshot(api_token, payload)
             print(f"✅ Notification sent: {message.get('notification_id')}")
 
         except Exception as e:
@@ -376,99 +378,4 @@ def template_ticket_open(payload):
                            .replace('{{ticket_url}}', str(payload.get('ticket_url', '#'))) \
                            .replace('{{app_name}}', str(payload.get('app_name', 'Visafly')))
 
-    return html_content
-
-def _format_date_en(dt_obj):
-    """Format date to '15 Jan 2024' to avoid US/UK format confusion."""
-    if not dt_obj:
-        return "N/A"
-    if isinstance(dt_obj, str):
-        return dt_obj
-    return dt_obj.strftime("%d %b %Y")
-
-def _format_time_en(dt_obj):
-    if not dt_obj:
-        return ""
-    return dt_obj.strftime("%H:%M:%S")
-
-def _parse_slots_summary(availability_data):
-    """Convert JSON data to a readable English summary."""
-    if not availability_data:
-        return "No specific dates data."
-    
-    if isinstance(availability_data, list):
-        # Take first 3 dates only to keep it clean
-        dates = availability_data[:3]
-        text = ", ".join(str(d) for d in dates)
-        if len(availability_data) > 3:
-            text += f" (+{len(availability_data) - 3} more)"
-        return text
-    return str(availability_data)
-
-def _get_display_meta(data):
-    """
-    Return dynamic header logic based on status.
-    Goal: Put the most important info in the notification preview.
-    """
-    date_str = _format_date_en(data.earliest_date)
-    
-    if data.availability_status == 'Available':
-        # Notification Preview: "🟢 UK London: 15 Jan 2024"
-        emoji = "🟢"
-        # If available, the DATE is the headline
-        headline = f"{data.country}, {data.city}: {date_str}" 
-        color = "info" # Green for WeChat
-    elif data.availability_status == 'Waitlist':
-        emoji = "🟡"
-        headline = f"Waitlist: {data.country}, {data.city}"
-        color = "warning" # Orange for WeChat
-    else:
-        emoji = "🔴"
-        headline = f"No Slots: {data.country}, {data.city}"
-        color = "comment" # Grey for WeChat
-        
-    return emoji, headline, color, date_str
-
-
-def generate_wechat_markdown(data) -> dict:
-    emoji, headline, color, date_str = _get_display_meta(data)
-    slots_summary = _parse_slots_summary(data.availability)
-    
-    # WeCom uses Markdown. 
-    # Logic: Keep the header distinct based on availability.
-    
-    markdown_content = (
-        f"# {emoji} {headline}\n"
-        f"> **Visa Type**: <font color=\"comment\">{data.visa_type}</font>\n"
-        f"> **Earliest**: <font color=\"{color}\">{date_str}</font>\n"
-        f"> **Details**: <font color=\"comment\">{slots_summary}</font>\n"
-        f"\n"
-        f"👉 [Tap to Book Appointment]({data.website})\n"
-        f"\n"
-        f"<font color=\"comment\">Updated: {_format_time_en(data.snapshot_at)}</font>"
-    )
-
-    return {
-        "msgtype": "markdown",
-        "markdown": {
-            "content": markdown_content
-        }
-    }
-    
-def generate_telegram_message(data) -> str:
-    emoji, headline, _, date_str = _get_display_meta(data)
-    slots_summary = _parse_slots_summary(data.availability)
-    
-    # HTML formatting
-    # <b> for headers, <code> for copying/highlighting data
-    msg = (
-        f"{emoji} <b>{headline}</b>\n" 
-        f"──────────────────\n"
-        f"🛂 <b>Visa:</b> {data.visa_type}\n"
-        f"📅 <b>Earliest:</b> <code>{date_str}</code>\n"
-        f"📊 <b>Slots:</b> {slots_summary}\n"
-        f"──────────────────\n"
-        f"🔗 <a href='{data.website}'><b>Book Now ➜</b></a>\n\n"
-        f"🕒 <i>Checked at {_format_time_en(data.snapshot_at)}</i>"
-    )
-    return msg
+    return html_content