import time import json import random import re import os import base64 from datetime import datetime from typing import List, Dict, Optional, Any, Callable from urllib.parse import urlencode, urlparse # DrissionPage 核心 from DrissionPage import ChromiumPage, ChromiumOptions from vs_plg import IVSPlg from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from toolkit.vs_cloud_api import VSCloudApi # ========================================== # 1. 辅助函数:代理插件 & 响应封装 # ========================================== def create_proxy_auth_extension(ip, port, username, password, plugin_path="./chrome_proxy_auth_plugin"): if not os.path.exists(plugin_path): os.makedirs(plugin_path) manifest_json = """ { "version": "1.0.0", "manifest_version": 2, "name": "Chrome Proxy Auth", "permissions": ["proxy", "tabs", "unlimitedStorage", "storage", "", "webRequest", "webRequestBlocking"], "background": {"scripts": ["background.js"]}, "minimum_chrome_version": "22.0.0" }""" background_js = f""" var config = {{mode: "fixed_servers", rules: {{singleProxy: {{scheme: "http", host: "{ip}", port: parseInt({port})}}, bypassList: ["localhost"]}}}}; chrome.proxy.settings.set({{value: config, scope: "regular"}}, function() {{}}); chrome.webRequest.onAuthRequired.addListener(function(details) {{ return {{authCredentials: {{username: "{username}", password: "{password}"}}}}; }}, {{urls: [""]}}, ['blocking']); """ with open(os.path.join(plugin_path, "manifest.json"), "w") as f: f.write(manifest_json) with open(os.path.join(plugin_path, "background.js"), "w") as f: f.write(background_js) return os.path.abspath(plugin_path) class BrowserResponse: def __init__(self, result_dict): result_dict = result_dict or {} self.status_code = result_dict.get('status', 0) self.text = result_dict.get('body', '') self.headers = result_dict.get('headers', {}) self.url = result_dict.get('url', '') self._json = None def json(self): if self._json is None: if not self.text: return {} try: self._json = json.loads(self.text) except: self._json = {} return self._json # ========================================== # 2. ItaPlugin 核心逻辑 # ========================================== class ItaPlugin(IVSPlg): def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.is_healthy = True self.logger = None self.page: Optional[ChromiumPage] = None self.session_create_time: float = 0 # Prenotami 特有配置 self._service_id = 0 self._host = 'https://prenotami.esteri.it' def get_group_id(self) -> str: return self.group_id def set_log(self, logger: Callable[[str], None]) -> None: self.logger = logger def _log(self, message): if self.logger: self.logger(f'[ItaPlugin] [{self.group_id}] {message}') else: print(f'[ItaPlugin] [{self.group_id}] {message}') def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} # Service ID (e.g., 1321 for Ireland, 5059 for Guangzhou) self._service_id = self.free_config.get('service_id', 0) def health_check(self) -> bool: if not self.is_healthy or not self.page: return False try: if not self.page.run_js("return 1;"): return False except: return False if self.config.session_max_life > 0: if time.time() - self.session_create_time > self.config.session_max_life * 60: self._log("Session expired.") return False return True # ------------------------------------------------------------- # 1. Create Session (Login) # ------------------------------------------------------------- def create_session(self): """ 全浏览器登录流程: 1. 启动浏览器 2. 解决 ReCaptcha 3. 登录并维持 Session """ self._log("Initializing Browser Session...") co = ChromiumOptions() co.auto_port() if self.config.proxy and self.config.proxy.ip: p = self.config.proxy if p.username and p.password: self._log(f"Configuring authenticated proxy: {p.ip}:{p.port}") co.add_extension(create_proxy_auth_extension(p.ip, p.port, p.username, p.password)) else: co.set_proxy(f"{p.scheme}://{p.ip}:{p.port}") co.headless(False) co.set_argument('--no-sandbox') co.set_argument('--disable-gpu') co.set_argument('--window-size=1920,1080') co.set_argument('--disable-blink-features=AutomationControlled') try: self.page = ChromiumPage(co) login_url = f"{self._host}/Home" self._log(f"Navigating to {login_url}") self.page.get(login_url) # 等待登录框 if not self.page.wait.ele_displayed('#login-email', timeout=20): raise BizLogicError("Login page not loaded") # 填充用户名密码 self.page.ele('#login-email').input(self.config.account.username) self.page.ele('#login-password').input(self.config.account.password) # 解决 ReCaptcha V2 self._handle_login_captcha() # 提交登录 self._log("Submitting login...") self.page.ele('xpath://*[@id="login-form"]/button').click() # 等待登录成功 (通常会跳转到 /UserArea 或 /Services) time.sleep(3) if "Home" in self.page.url and not self.page.ele('#logoutForm'): # 检查是否有错误提示 if self.page.ele('.alert-danger'): err = self.page.ele('.alert-danger').text raise PermissionDeniedError(f"Login Failed: {err}") raise BizLogicError("Login Failed: Unknown reason") self._log("Login Successful.") # 访问服务列表页以保活 self.page.get(f"{self._host}/Services") self.session_create_time = time.time() except Exception as e: self._log(f"Create Session Failed: {e}") if self.page: self.page.quit() self.page = None raise e def _handle_login_captcha(self): """处理登录页面的 ReCaptcha""" if self.page.ele('#recaptcha-anchor') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'): self._log("Solving ReCaptcha...") api_token = self.free_config.get("capsolver_key", "") if not api_token: self._log("WARN: No capsolver_key, manual solve required.") time.sleep(5) return site_key = "6LdkwrIqAAAAAC4NX-g_j7lEx9vh1rg94ZL2cFfY" # Prenotami Site Key rc_params = { "type": "ReCaptchaV2TaskProxyLess", "page": self.page.url, "siteKey": site_key, "apiToken": api_token } g_token = self._solve_recaptcha(rc_params) # 注入 Token js = f""" var el = document.getElementById('g-recaptcha-response'); if(el) {{ el.value = "{g_token}"; }} """ self.page.run_js(js) self._log("Captcha solved & injected.") # ------------------------------------------------------------- # 2. Query Availability # ------------------------------------------------------------- def query(self) -> VSQueryResult: res = VSQueryResult() res.success = False res.availability_status = AvailabilityStatus.NoneAvailable if not self._service_id: raise BizLogicError("Service ID not configured") # 1. 检查 Slot 是否可用 (Check Availability Endpoint) check_url = f"{self._host}/Services/Booking/{self._service_id}" # 使用 Fetch 发起检查请求 resp = self._perform_request("GET", check_url, headers={ "Referer": f"{self._host}/Services", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" }) # 302 跳转处理逻辑 if resp.status_code == 200: # 200 表示进入了预约页,有号 self._log("Slot Check: 200 OK (Availability Detected)") pass elif "BookingCalendar" in resp.url: # 或者是被重定向到了 Calendar self._log("Slot Check: Redirected to Calendar (Availability Detected)") pass else: # 被重定向回 Home 或 Service,说明没号或 Session 过期 if "Home" in resp.url or "Login" in resp.url: self.is_healthy = False raise SessionExpiredOrInvalidError("Session expired during query") self._log("Slot Check: No availability (Redirected back)") return res # 2. 查询月份 (Query Month) # 默认查询当月,或者配置的月份 tar_dates = self.free_config.get("target_dates", []) if not tar_dates: # 默认查下个月 next_month = datetime.now().replace(day=28) + datetime.timedelta(days=4) tar_dates = [next_month.strftime("%Y-%m-%d")] all_slots = [] # Prenotami 需要先 retrieve server info self._perform_request("GET", f"{self._host}/BookingCalendar/RetrieveServerInfo") for date_str in tar_dates: # 构造月份格式 2026-01-05 -> 2026-01-01 (API 需要) try: dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ") except: try: dt = datetime.strptime(date_str, "%Y-%m-%d") except: dt = datetime.now() # API 需要格式: 2025-11-05T... 格式的字符串作为 selectedDay # 实际上 RetrieveCalendarAvailability 只需要由前端日历控件触发的格式 # 查询日历 API cal_url = f"{self._host}/BookingCalendar/RetrieveCalendarAvailability" cal_payload = { "_Servizio": str(self._service_id), "selectedDay": date_str # 原样传配置里的 ISO 串 } resp_cal = self._perform_request("POST", cal_url, json_data=cal_payload) if resp_cal.status_code != 200: continue # 解析有效日期 valid_days = self._parse_valid_days(resp_cal.text) self._log(f"Valid days for {date_str}: {valid_days}") for day in valid_days: # 查询具体 Slot slot_url = f"{self._host}/BookingCalendar/RetrieveTimeSlots" slot_payload = { "selectedDay": day, # YYYY-MM-DD "idService": str(self._service_id) } resp_slot = self._perform_request("POST", slot_url, json_data=slot_payload) time_slots = self._parse_time_slots(resp_slot.text) if time_slots: res.success = True res.availability_status = AvailabilityStatus.Available res.earliest_date = day # 转换结构 ts_list = [] for ts in time_slots: # ts: {'id': 123, 'start': '10:00', 'end': '10:30', 'remain': 1} ts_list.append(TimeSlot( time=f"{ts['start']} - {ts['end']}", label=str(ts['id']) # 将 ID 存入 label 以便 book 使用 )) res.availability.append(DateAvailability(date=day, times=ts_list)) return res # ------------------------------------------------------------- # 3. Book # ------------------------------------------------------------- def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult: res = VSBookResult() res.success = False if not slot_info.availability: raise NotFoundError("No slots to book") target_date = slot_info.availability[0].date # 取第一个时间段 target_slot = slot_info.availability[0].times[0] slot_id = target_slot.label # 我们在 query 里把 ID 存在了 label slot_text = target_slot.time # "10:00 - 10:30" # 1. 获取 OTP (GenerateOTP) self._log("Requesting OTP...") otp_url = f"{self._host}/BookingCalendar/GenerateOTP?ServiceID={self._service_id}" self._perform_request("POST", otp_url) # 2. 等待并读取邮件 self._log("Waiting for email code...") time.sleep(10) # 稍微等一下发信 email_account = self.config.account.email # 使用 CloudAPI 读取 (假设已配置) otp_code = VSCloudApi.Instance().get_email_verify_code(email_account) if not otp_code: raise BizLogicError("Failed to retrieve OTP code") self._log(f"Got OTP: {otp_code}") # 3. 提交详细信息 (Fill User Info) # 这是最复杂的一步,涉及文件上传 (Multipart) self._log("Submitting User Details & Files...") # 准备文件 (转 Base64 传给 JS) passport_pdf_path = user_inputs.get('passport_pdf_path') irp_pdf_path = user_inputs.get('irp_pdf_path') def file_to_b64(path): if not path or not os.path.exists(path): return "" with open(path, "rb") as f: return base64.b64encode(f.read()).decode('utf-8') ppt_b64 = file_to_b64(passport_pdf_path) irp_b64 = file_to_b64(irp_pdf_path) # 构造 JS FormData 提交脚本 # 注意:这里需要根据 Service ID (Dublin/Canton) 动态调整字段 ID # 下面以 Dublin (1321) 的字段为例,如果是 Canton 需要修改 _Id 和 _TipoDatoAddizionale # 为了通用性,这里演示 Dublin 的结构,请根据实际 Service ID 调整 mapping # 假设是 Dublin (根据提供的源码分析) boundary = '----WebKitFormBoundaryRandomString' submit_url = f"{self._host}/Services/Booking/{self._service_id}" # 注入 JS 执行 js_submit = f""" const url = "{submit_url}"; const fd = new FormData(); // 基础字段 fd.append('ServizioDescrizione', 'D Visa Application'); fd.append('MessaggioRassicuranteWaitingList', 'True'); fd.append('isWaitingListEnabled', 'False'); fd.append('IDServizioConsolare', '35'); fd.append('IDServizioErogato', '{self._service_id}'); fd.append('IdTipoPrenotazione', '1'); // Single fd.append('NumMaxAccompagnatori', '3'); fd.append('NumAccompagnatoriSelected', '0'); // 动态字段 (Dublin 示例) // [0] Other citizenship -> User Input fd.append('DatiAddizionaliPrenotante[0]._Descrizione', 'Other citizenship/s'); fd.append('DatiAddizionaliPrenotante[0]._testo', '{user_inputs.get("citizen", "China")}'); fd.append('DatiAddizionaliPrenotante[0]._Obbligatorio', 'False'); fd.append('DatiAddizionaliPrenotante[0]._Id', '61738'); fd.append('DatiAddizionaliPrenotante[0]._TipoDatoAddizionale.IDTipoDatoAddizionale', '26'); fd.append('DatiAddizionaliPrenotante[0]._TipoDatoAddizionale.IDTipoControllo', '2'); // [1] Full address -> User Input fd.append('DatiAddizionaliPrenotante[1]._Descrizione', 'Full residence address'); fd.append('DatiAddizionaliPrenotante[1]._testo', '{user_inputs.get("address", "")}'); fd.append('DatiAddizionaliPrenotante[1]._Obbligatorio', 'True'); fd.append('DatiAddizionaliPrenotante[1]._Id', '61739'); fd.append('DatiAddizionaliPrenotante[1]._TipoDatoAddizionale.IDTipoDatoAddizionale', '25'); fd.append('DatiAddizionaliPrenotante[1]._TipoDatoAddizionale.IDTipoControllo', '2'); // [2] Passport Num fd.append('DatiAddizionaliPrenotante[2]._Descrizione', 'Passport number'); fd.append('DatiAddizionaliPrenotante[2]._testo', '{user_inputs.get("passport", "")}'); fd.append('DatiAddizionaliPrenotante[2]._Obbligatorio', 'True'); fd.append('DatiAddizionaliPrenotante[2]._Id', '61740'); fd.append('DatiAddizionaliPrenotante[2]._TipoDatoAddizionale.IDTipoDatoAddizionale', '2'); fd.append('DatiAddizionaliPrenotante[2]._TipoDatoAddizionale.IDTipoControllo', '2'); // [3] Reason (Select) fd.append('DatiAddizionaliPrenotante[3]._Descrizione', 'Reason for visit'); fd.append('DatiAddizionaliPrenotante[3]._Obbligatorio', 'True'); fd.append('DatiAddizionaliPrenotante[3]._Id', '61741'); fd.append('DatiAddizionaliPrenotante[3]._TipoDatoAddizionale.IDTipoDatoAddizionale', '34'); fd.append('DatiAddizionaliPrenotante[3]._TipoDatoAddizionale.IDTipoControllo', '3'); fd.append('DatiAddizionaliPrenotante[3]._idSelezionato', '42'); // 42 = Tourism? Need verify // OTP fd.append('otp-input', '{otp_code}'); fd.append('PrivacyCheck', 'true'); // 文件处理 (Base64 -> Blob -> FormData) // 注意:这里假设页面上有文件上传的对应 ID,或者我们直接硬编码 FormData // 原始抓包并未显示文件字段名,通常是 File_0, File_1 // 我们需要将 base64 转 blob async function addFile(b64, name, filename) {{ if(!b64) return; const res = await fetch(`data:application/pdf;base64,${{b64}}`); const blob = await res.blob(); fd.append(name, blob, filename); }} // 并行处理文件 await Promise.all([ addFile('{ppt_b64}', 'File_0', 'passport.pdf'), // 假设 File_0 是护照 addFile('{irp_b64}', 'File_1', 'irp.pdf') // 假设 File_1 是 IRP ]); // 发送 POST return fetch(url, {{ method: 'POST', body: fd }}).then(async r => {{ return {{ status: r.status, url: r.url, text: await r.text() }}; }}).catch(e => {{ return {{ status: 0, text: e.toString() }}; }}); """ result_dict = self.page.run_js(js_submit) resp = BrowserResponse(result_dict) if resp.status_code == 302 or "BookingCalendar" in resp.url: self._log("User Info Submitted Successfully.") else: self._log(f"User Info Submit Failed: {resp.text[:100]}") # 如果 OTP 错误,页面会返回特定错误信息 if "Codice errato" in resp.text: raise BizLogicError("Invalid OTP Code") return res # Fail # 4. 最终确认预约 (InsertNewBooking) self._log("Finalizing Booking...") final_url = f"{self._host}/BookingCalendar/InsertNewBooking" final_payload = { "idCalendarioGiornaliero": slot_id, "selectedDay": target_date, "selectedHour": slot_text # "10:00 - 10:30(2)" } # 这里用 Form-UrlEncoded resp_final = self._perform_request("POST", final_url, data=final_payload) if resp_final.status_code == 200: self._log("Booking Confirmed!") res.success = True res.book_date = target_date res.book_time = slot_text else: self._log(f"Final Booking Failed: {resp_final.status_code}") return res # ------------------------------------------------------------- # 4. Helpers # ------------------------------------------------------------- def _perform_request(self, method, url, headers=None, data=None, json_data=None): """JS Fetch Wrapper""" if not self.page: raise BizLogicError("Browser not init") fetch_opts = { "method": method.upper(), "headers": headers or {}, "credentials": "include" } if json_data: fetch_opts['body'] = json.dumps(json_data) fetch_opts['headers']['Content-Type'] = 'application/json; charset=UTF-8' elif data: if isinstance(data, dict): from urllib.parse import urlencode fetch_opts['body'] = urlencode(data) fetch_opts['headers']['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' else: fetch_opts['body'] = data js = f""" return fetch("{url}", {json.dumps(fetch_opts)}) .then(async r => {{ const h = {{}}; r.headers.forEach((v, k) => h[k] = v); return {{ status: r.status, body: await r.text(), headers: h, url: r.url }}; }}).catch(e => {{ return {{ status: 0, body: e.toString() }}; }}); """ return BrowserResponse(self.page.run_js(js, timeout=60)) # 文件上传可能较慢,给60s def _solve_recaptcha(self, params) -> str: # 复用通用的 Capsolver 逻辑 key = params.get("apiToken") import requests as req task = { "type": params.get("type"), "websiteURL": params.get("page"), "websiteKey": params.get("siteKey") } r = req.post("https://api.capsolver.com/createTask", json={"clientKey": key, "task": task}, timeout=20) if r.status_code != 200: raise BizLogicError("Capsolver submit failed") tid = r.json().get("taskId") for _ in range(20): r = req.post("https://api.capsolver.com/getTaskResult", json={"clientKey": key, "taskId": tid}, timeout=20) if r.status_code == 200 and r.json().get("status") == "ready": return r.json()["solution"]["gRecaptchaResponse"] time.sleep(3) raise BizLogicError("Capsolver timeout") def _parse_valid_days(self, text): # 提取 DateLibere (YYYY-MM-DD) # 格式: {"DateLibere":"22/10/2024 00:00:00","SlotLiberi":1,"SlotRimanenti":1} # 原始正则: r'{"DateLibere":"(.*?)","SlotLiberi":\d+,"SlotRimanenti":(-?\d+)}' days = [] try: matches = re.findall(r'{"DateLibere":"(.*?)".*?"SlotRimanenti":(-?\d+)}', text) for d_str, rem in matches: if int(rem) != -1: # 22/10/2024 -> 2024-10-22 dt = datetime.strptime(d_str[:10], "%d/%m/%Y") days.append(dt.strftime("%Y-%m-%d")) except: pass return days def _parse_time_slots(self, text): # 提取 IDCalendarioServizioGiornaliero, StartTime, EndTime, Remain slots = [] try: # 原始逻辑比较复杂,这里简化正则 # 查找 SlotRimanenti > 0 的记录 # 关键是 IDCalendarioServizioGiornaliero raw_list = json.loads(text) # Prenotami 返回的是一个 JSON 列表字符串 for item in raw_list: remain = item.get('SlotRimanenti', -1) if remain > 0: start = item['OrarioInizioFascia'] end = item['OrarioFineFascia'] s_time = f"{start['Hours']:02d}:{start['Minutes']:02d}" e_time = f"{end['Hours']:02d}:{end['Minutes']:02d}" slots.append({ 'id': item['IDCalendarioServizioGiornaliero'], 'start': s_time, 'end': e_time, 'remain': remain }) except: pass return slots