| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752 |
- import time
- import json
- import random
- import uuid
- import shutil
- import re
- import os
- import base64
- from datetime import datetime
- from typing import List, Dict, Optional, Any, Callable
- from urllib.parse import urlencode, urlparse
- # DrissionPage 核心
- from DrissionPage import ChromiumPage, ChromiumOptions
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from toolkit.proxy_tunnel import ProxyTunnel
- from toolkit.vs_cloud_api import VSCloudApi
- class BrowserResponse:
- def __init__(self, result_dict):
- result_dict = result_dict or {}
- self.status_code = result_dict.get('status', 0)
- self.text = result_dict.get('body', '')
- self.headers = result_dict.get('headers', {})
- self.url = result_dict.get('url', '')
- self._json = None
- def json(self):
- if self._json is None:
- if not self.text: return {}
- try: self._json = json.loads(self.text)
- except: self._json = {}
- return self._json
- # ==========================================
- # 2. ItaPlugin 核心逻辑
- # ==========================================
- class ItaPlugin(IVSPlg):
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.is_healthy = True
- self.logger = None
- self.page: Optional[ChromiumPage] = None
-
- # Prenotami 特有配置
- self._service_id = 0
- self._host = 'https://prenotami.esteri.it'
-
-
- # --- [核心修改] 并发隔离与资源管理 ---
- # 生成唯一实例 ID
- self.instance_id = uuid.uuid4().hex[:8]
- self.root_workspace = os.path.abspath(os.path.join("temp_browser_data", f"{self.group_id}_{self.instance_id}"))
- # 定义子目录:代理插件目录 & 浏览器用户数据目录
- self.user_data_path = os.path.join(self.root_workspace, "user_data")
-
- # 确保根目录存在 (子目录由具体逻辑创建)
- if not os.path.exists(self.root_workspace):
- os.makedirs(self.root_workspace)
-
- # 持有隧道实例
- self.tunnel = None
-
- self.session_create_time: float = 0
- def get_group_id(self) -> str:
- return self.group_id
-
- def set_log(self, logger: Callable[[str], None]) -> None:
- self.logger = logger
-
- def _log(self, message):
- if self.logger:
- self.logger(f'[ItaPlugin] [{self.group_id}] {message}')
- else:
- print(f'[ItaPlugin] [{self.group_id}] {message}')
-
- def set_config(self, config: VSPlgConfig):
- self.config = config
- self.free_config = config.free_config or {}
- # Service ID (e.g., 1321 for Ireland, 5059 for Guangzhou)
- self._service_id = self.free_config.get('service_id', 0)
- def health_check(self) -> bool:
- if not self.is_healthy or not self.page:
- return False
- try:
- if not self.page.run_js("return 1;"):
- return False
- except:
- return False
- if self.config.session_max_life > 0:
- if time.time() - self.session_create_time > self.config.session_max_life * 60:
- self._log("Session expired.")
- return False
- return True
- # -------------------------------------------------------------
- # 1. Create Session (Login)
- # -------------------------------------------------------------
- def create_session(self):
- """
- 全浏览器登录流程:
- 1. 启动浏览器
- 2. 解决 ReCaptcha
- 3. 登录并维持 Session
- """
- self._log(f"Initializing Session (ID: {self.instance_id})...")
- co = ChromiumOptions()
- # -------------------------------------------------------------
- # [核心修复] 解决 'not enough values to unpack'
- # -------------------------------------------------------------
- # 1. 不要用 co.auto_port(),因为它依赖解析 stdout,会被 DBus 报错干扰
- # 2. 我们手动随机生成一个端口
- import random
- import socket
-
- def get_free_port():
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('', 0))
- return s.getsockname()[1]
-
- debug_port = get_free_port()
- self._log(f"Assigned Debug Port: {debug_port}")
-
- # --- [关键配置] 设置独立的用户数据目录 ---
- # 这样每个实例的 Cache, Cookies, LocalStorage 都是完全隔离的
- # 同时也防止了多进程争抢同一个 Default 文件夹导致的崩溃
- co.set_user_data_path(self.user_data_path)
-
- # --- 1. 指定浏览器路径 (适配 Docker) ---
- chrome_path = os.getenv("CHROME_BIN")
- if chrome_path and os.path.exists(chrome_path):
- co.set_paths(browser_path=chrome_path)
-
- # --- [核心修改] 代理配置 ---
- if self.config.proxy and self.config.proxy.ip:
- p = self.config.proxy
-
- if p.username and p.password:
- self._log(f"Starting Proxy Tunnel for {p.ip}...")
-
- # 1. 启动本地隧道
- self.tunnel = ProxyTunnel(p.ip, p.port, p.username, p.password)
- local_proxy = self.tunnel.start()
-
- self._log(f"Tunnel started at {local_proxy}")
-
- # 2. Chrome 连接本地免密端口
- # 必须使用 --proxy-server 强制指定,绝对稳健
- co.set_argument(f'--proxy-server={local_proxy}')
-
- else:
- # 无密码代理,直接用
- proxy_str = f"{p.scheme}://{p.ip}:{p.port}"
- co.set_argument(f'--proxy-server={proxy_str}')
- else:
- self._log("[WARN] No proxy configured!")
- co.headless(False)
- co.set_argument('--no-sandbox')
- co.set_argument('--disable-gpu')
- co.set_argument('--disable-dev-shm-usage')
- co.set_argument('--window-size=1920,1080')
- co.set_argument('--disable-blink-features=AutomationControlled')
- try:
- self.page = ChromiumPage(co)
-
- login_url = f"{self._host}/Home"
- self._log(f"Navigating to {login_url}")
- self.page.get(login_url)
-
- # 等待登录框
- if not self.page.wait.ele_displayed('#login-email', timeout=20):
- raise BizLogicError("Login page not loaded")
- # 填充用户名密码
- self.page.ele('#login-email').input(self.config.account.username)
- self.page.ele('#login-password').input(self.config.account.password)
-
- # 4. [核心修改] 解决 ReCaptcha V3 Enterprise 并注入
- # Prenotami 使用的是 Enterprise V3, Action = 'LOGIN'
- self._solve_and_inject_prenotami_captcha()
-
- # 5. [核心修改] 提交登录
- # 不要点击 #captcha-trigger,因为它会触发网页自带的 Google 验证逻辑
- # 我们直接触发表单提交,因为 Token 已经由我们注入了
- self._log("Submitting login form via JS...")
- self.page.run_js("document.getElementById('login-form').submit()")
-
- # 等待 URL 变化或特定元素出现
- # 成功通常跳转到 /UserArea, 失败则留在 /Home
- end_time = time.time() + 45
- login_success = False
-
- while time.time() < end_time:
- time.sleep(1)
- curr_url = self.page.url
-
- # 成功特征
- if "/UserArea" in curr_url or "/Services" in curr_url:
- login_success = True
- break
-
- # 失败特征
- if self.page.ele('.validation-summary-errors') or self.page.ele('.field-validation-error'):
- err_text = self.page.ele('.validation-summary-errors').text if self.page.ele('.validation-summary-errors') else "Unknown validation error"
- raise PermissionDeniedError(f"Login Failed: {err_text}")
-
- # 检查是否有弹窗错误
- if "Home" in curr_url and self.page.ele('#logoutForm'):
- # 有时候虽然在 Home 但出现了 Logout 按钮,也算成功
- login_success = True
- break
- if not login_success:
- # 截图保留现场
- # self.page.get_screenshot(path="login_fail.jpg")
- raise BizLogicError("Login Failed: Timeout waiting for redirect (Captcha score too low?)")
- self._log("Login Successful.")
-
- # 访问服务页保活
- self.page.get(f"{self._host}/Services")
-
- self.session_create_time = time.time()
- except Exception as e:
- self._log(f"Create Session Failed: {e}")
- self.cleanup()
- raise e
- # -------------------------------------------------------------
- # 2. Query Availability
- # -------------------------------------------------------------
- def query(self) -> VSQueryResult:
- res = VSQueryResult()
- res.success = False
- res.availability_status = AvailabilityStatus.NoneAvailable
-
- if not self._service_id:
- raise BizLogicError("Service ID not configured")
- # 1. 检查 Slot 是否可用 (Check Availability Endpoint)
- check_url = f"{self._host}/Services/Booking/{self._service_id}"
-
- # 使用 Fetch 发起检查请求
- resp = self._perform_request("GET", check_url, headers={
- "Referer": f"{self._host}/Services",
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
- })
-
- # 302 跳转处理逻辑
- if resp.status_code == 200:
- # 200 表示进入了预约页,有号
- self._log("Slot Check: 200 OK (Availability Detected)")
- pass
- elif "BookingCalendar" in resp.url: # 或者是被重定向到了 Calendar
- self._log("Slot Check: Redirected to Calendar (Availability Detected)")
- pass
- else:
- # 被重定向回 Home 或 Service,说明没号或 Session 过期
- if "Home" in resp.url or "Login" in resp.url:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError("Session expired during query")
- self._log("Slot Check: No availability (Redirected back)")
- return res
- # 2. 查询月份 (Query Month)
- # 默认查询当月,或者配置的月份
- tar_dates = self.free_config.get("target_dates", [])
- if not tar_dates:
- # 默认查下个月
- next_month = datetime.now().replace(day=28) + datetime.timedelta(days=4)
- tar_dates = [next_month.strftime("%Y-%m-%d")]
- all_slots = []
-
- # Prenotami 需要先 retrieve server info
- self._perform_request("GET", f"{self._host}/BookingCalendar/RetrieveServerInfo")
- for date_str in tar_dates:
- # 构造月份格式 2026-01-05 -> 2026-01-01 (API 需要)
- try:
- dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
- except:
- try:
- dt = datetime.strptime(date_str, "%Y-%m-%d")
- except:
- dt = datetime.now()
-
- # API 需要格式: 2025-11-05T... 格式的字符串作为 selectedDay
- # 实际上 RetrieveCalendarAvailability 只需要由前端日历控件触发的格式
-
- # 查询日历 API
- cal_url = f"{self._host}/BookingCalendar/RetrieveCalendarAvailability"
- cal_payload = {
- "_Servizio": str(self._service_id),
- "selectedDay": date_str # 原样传配置里的 ISO 串
- }
-
- resp_cal = self._perform_request("POST", cal_url, json_data=cal_payload)
-
- if resp_cal.status_code != 200: continue
-
- # 解析有效日期
- valid_days = self._parse_valid_days(resp_cal.text)
- self._log(f"Valid days for {date_str}: {valid_days}")
-
- for day in valid_days:
- # 查询具体 Slot
- slot_url = f"{self._host}/BookingCalendar/RetrieveTimeSlots"
- slot_payload = {
- "selectedDay": day, # YYYY-MM-DD
- "idService": str(self._service_id)
- }
- resp_slot = self._perform_request("POST", slot_url, json_data=slot_payload)
-
- time_slots = self._parse_time_slots(resp_slot.text)
- if time_slots:
- res.success = True
- res.availability_status = AvailabilityStatus.Available
- res.earliest_date = day
-
- # 转换结构
- ts_list = []
- for ts in time_slots:
- # ts: {'id': 123, 'start': '10:00', 'end': '10:30', 'remain': 1}
- ts_list.append(TimeSlot(
- time=f"{ts['start']} - {ts['end']}",
- label=str(ts['id']) # 将 ID 存入 label 以便 book 使用
- ))
-
- res.availability.append(DateAvailability(date=day, times=ts_list))
-
- return res
- # -------------------------------------------------------------
- # 3. Book
- # -------------------------------------------------------------
- def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult:
- res = VSBookResult()
- res.success = False
-
- if not slot_info.availability:
- raise NotFoundError("No slots to book")
-
- target_date = slot_info.availability[0].date
- # 取第一个时间段
- target_slot = slot_info.availability[0].times[0]
- slot_id = target_slot.label # 我们在 query 里把 ID 存在了 label
- slot_text = target_slot.time # "10:00 - 10:30"
-
- # 1. 获取 OTP (GenerateOTP)
- self._log("Requesting OTP...")
- otp_url = f"{self._host}/BookingCalendar/GenerateOTP?ServiceID={self._service_id}"
- self._perform_request("POST", otp_url)
-
- # 2. 等待并读取邮件
- self._log("Waiting for email code...")
- time.sleep(10) # 稍微等一下发信
- email_account = self.config.account.email
- # 使用 CloudAPI 读取 (假设已配置)
- otp_code = VSCloudApi.Instance().get_email_verify_code(email_account)
-
- if not otp_code:
- raise BizLogicError("Failed to retrieve OTP code")
- self._log(f"Got OTP: {otp_code}")
- # 3. 提交详细信息 (Fill User Info)
- # 这是最复杂的一步,涉及文件上传 (Multipart)
- self._log("Submitting User Details & Files...")
-
- # 准备文件 (转 Base64 传给 JS)
- passport_pdf_path = user_inputs.get('passport_pdf_path')
- irp_pdf_path = user_inputs.get('irp_pdf_path')
-
- def file_to_b64(path):
- if not path or not os.path.exists(path): return ""
- with open(path, "rb") as f:
- return base64.b64encode(f.read()).decode('utf-8')
- ppt_b64 = file_to_b64(passport_pdf_path)
- irp_b64 = file_to_b64(irp_pdf_path)
-
- # 构造 JS FormData 提交脚本
- # 注意:这里需要根据 Service ID (Dublin/Canton) 动态调整字段 ID
- # 下面以 Dublin (1321) 的字段为例,如果是 Canton 需要修改 _Id 和 _TipoDatoAddizionale
- # 为了通用性,这里演示 Dublin 的结构,请根据实际 Service ID 调整 mapping
-
- # 假设是 Dublin (根据提供的源码分析)
- boundary = '----WebKitFormBoundaryRandomString'
- submit_url = f"{self._host}/Services/Booking/{self._service_id}"
-
- # 注入 JS 执行
- js_submit = f"""
- const url = "{submit_url}";
- const fd = new FormData();
-
- // 基础字段
- fd.append('ServizioDescrizione', 'D Visa Application');
- fd.append('MessaggioRassicuranteWaitingList', 'True');
- fd.append('isWaitingListEnabled', 'False');
- fd.append('IDServizioConsolare', '35');
- fd.append('IDServizioErogato', '{self._service_id}');
- fd.append('IdTipoPrenotazione', '1'); // Single
- fd.append('NumMaxAccompagnatori', '3');
- fd.append('NumAccompagnatoriSelected', '0');
-
- // 动态字段 (Dublin 示例)
- // [0] Other citizenship -> User Input
- fd.append('DatiAddizionaliPrenotante[0]._Descrizione', 'Other citizenship/s');
- fd.append('DatiAddizionaliPrenotante[0]._testo', '{user_inputs.get("citizen", "China")}');
- fd.append('DatiAddizionaliPrenotante[0]._Obbligatorio', 'False');
- fd.append('DatiAddizionaliPrenotante[0]._Id', '61738');
- fd.append('DatiAddizionaliPrenotante[0]._TipoDatoAddizionale.IDTipoDatoAddizionale', '26');
- fd.append('DatiAddizionaliPrenotante[0]._TipoDatoAddizionale.IDTipoControllo', '2');
-
- // [1] Full address -> User Input
- fd.append('DatiAddizionaliPrenotante[1]._Descrizione', 'Full residence address');
- fd.append('DatiAddizionaliPrenotante[1]._testo', '{user_inputs.get("address", "")}');
- fd.append('DatiAddizionaliPrenotante[1]._Obbligatorio', 'True');
- fd.append('DatiAddizionaliPrenotante[1]._Id', '61739');
- fd.append('DatiAddizionaliPrenotante[1]._TipoDatoAddizionale.IDTipoDatoAddizionale', '25');
- fd.append('DatiAddizionaliPrenotante[1]._TipoDatoAddizionale.IDTipoControllo', '2');
- // [2] Passport Num
- fd.append('DatiAddizionaliPrenotante[2]._Descrizione', 'Passport number');
- fd.append('DatiAddizionaliPrenotante[2]._testo', '{user_inputs.get("passport", "")}');
- fd.append('DatiAddizionaliPrenotante[2]._Obbligatorio', 'True');
- fd.append('DatiAddizionaliPrenotante[2]._Id', '61740');
- fd.append('DatiAddizionaliPrenotante[2]._TipoDatoAddizionale.IDTipoDatoAddizionale', '2');
- fd.append('DatiAddizionaliPrenotante[2]._TipoDatoAddizionale.IDTipoControllo', '2');
- // [3] Reason (Select)
- fd.append('DatiAddizionaliPrenotante[3]._Descrizione', 'Reason for visit');
- fd.append('DatiAddizionaliPrenotante[3]._Obbligatorio', 'True');
- fd.append('DatiAddizionaliPrenotante[3]._Id', '61741');
- fd.append('DatiAddizionaliPrenotante[3]._TipoDatoAddizionale.IDTipoDatoAddizionale', '34');
- fd.append('DatiAddizionaliPrenotante[3]._TipoDatoAddizionale.IDTipoControllo', '3');
- fd.append('DatiAddizionaliPrenotante[3]._idSelezionato', '42'); // 42 = Tourism? Need verify
- // OTP
- fd.append('otp-input', '{otp_code}');
- fd.append('PrivacyCheck', 'true');
-
- // 文件处理 (Base64 -> Blob -> FormData)
- // 注意:这里假设页面上有文件上传的对应 ID,或者我们直接硬编码 FormData
- // 原始抓包并未显示文件字段名,通常是 File_0, File_1
- // 我们需要将 base64 转 blob
-
- async function addFile(b64, name, filename) {{
- if(!b64) return;
- const res = await fetch(`data:application/pdf;base64,${{b64}}`);
- const blob = await res.blob();
- fd.append(name, blob, filename);
- }}
-
- // 并行处理文件
- await Promise.all([
- addFile('{ppt_b64}', 'File_0', 'passport.pdf'), // 假设 File_0 是护照
- addFile('{irp_b64}', 'File_1', 'irp.pdf') // 假设 File_1 是 IRP
- ]);
- // 发送 POST
- return fetch(url, {{
- method: 'POST',
- body: fd
- }}).then(async r => {{
- return {{ status: r.status, url: r.url, text: await r.text() }};
- }}).catch(e => {{ return {{ status: 0, text: e.toString() }}; }});
- """
-
- result_dict = self.page.run_js(js_submit)
- resp = BrowserResponse(result_dict)
-
- if resp.status_code == 302 or "BookingCalendar" in resp.url:
- self._log("User Info Submitted Successfully.")
- else:
- self._log(f"User Info Submit Failed: {resp.text[:100]}")
- # 如果 OTP 错误,页面会返回特定错误信息
- if "Codice errato" in resp.text:
- raise BizLogicError("Invalid OTP Code")
- return res # Fail
- # 4. 最终确认预约 (InsertNewBooking)
- self._log("Finalizing Booking...")
- final_url = f"{self._host}/BookingCalendar/InsertNewBooking"
- final_payload = {
- "idCalendarioGiornaliero": slot_id,
- "selectedDay": target_date,
- "selectedHour": slot_text # "10:00 - 10:30(2)"
- }
- # 这里用 Form-UrlEncoded
- resp_final = self._perform_request("POST", final_url, data=final_payload)
-
- if resp_final.status_code == 200:
- self._log("Booking Confirmed!")
- res.success = True
- res.book_date = target_date
- res.book_time = slot_text
- else:
- self._log(f"Final Booking Failed: {resp_final.status_code}")
-
- return res
- # -------------------------------------------------------------
- # 4. Helpers
- # -------------------------------------------------------------
-
- def _get_proxy_url(self):
- # 构造代理
- proxy_url = ""
- if self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- proxy_url = f"{s.scheme}://{s.ip}:{s.port}"
- return proxy_url
- def _solve_and_inject_prenotami_captcha(self):
- """
- 专门处理 Prenotami 的 ReCaptcha Enterprise
- """
- self._log("Solving ReCaptcha Enterprise (Action: LOGIN)...")
-
- api_token = self.free_config.get("capsolver_key", "")
- if not api_token:
- raise BizLogicError("Capsolver Key is required for Prenotami")
- # 从 HTML 源码中提取的信息
- site_key = "6LdkwrIqAAAAAC4NX-g_j7lEx9vh1rg94ZL2cFfY"
- page_url = self.page.url
-
- # 注意:Prenotami 的这个 Key 其实是混合模式,
- # 虽然它是 V3 (Enterprise),但很多打码平台用 V2 接口也能解,或者必须用 V3 Enterprise 接口
- # 建议先尝试 ReCaptchaV3EnterpriseTaskProxyLess
-
- # 修正为最标准的 V3 Enterprise 配置
- rc_params = {
- "type": "ReCaptchaV3EnterpriseTaskProxyless",
- "page": page_url,
- "siteKey": site_key,
- "action": "LOGIN", # 关键参数
- "minScore": 0.7, # 要求高分
- "apiToken": api_token,
- # "proxy": self._get_proxy_url()
- }
-
- g_token = self._solve_recaptcha(rc_params)
- self._log(f"Captcha Solved. Token length: {len(g_token)}")
-
- # 注入 Token 到表单
- # 页面逻辑是:$('#login-form').append('<input type="hidden" name="g-recaptcha-response" value="' + token + '" />');
- js_inject = f"""
- var form = document.getElementById('login-form');
- // 移除旧的 input 防止重复
- var old = document.getElementsByName('g-recaptcha-response');
- if(old.length > 0) old[0].remove();
-
- var input = document.createElement('input');
- input.type = 'hidden';
- input.name = 'g-recaptcha-response';
- input.value = "{g_token}";
- form.appendChild(input);
- """
- self.page.run_js(js_inject)
-
- def _perform_request(self, method, url, headers=None, data=None, json_data=None):
- """JS Fetch Wrapper"""
- if not self.page: raise BizLogicError("Browser not init")
-
- fetch_opts = { "method": method.upper(), "headers": headers or {}, "credentials": "include" }
- if json_data:
- fetch_opts['body'] = json.dumps(json_data)
- fetch_opts['headers']['Content-Type'] = 'application/json; charset=UTF-8'
- elif data:
- if isinstance(data, dict):
- from urllib.parse import urlencode
- fetch_opts['body'] = urlencode(data)
- fetch_opts['headers']['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
- else:
- fetch_opts['body'] = data
- js = f"""
- return fetch("{url}", {json.dumps(fetch_opts)})
- .then(async r => {{
- const h = {{}}; r.headers.forEach((v, k) => h[k] = v);
- return {{ status: r.status, body: await r.text(), headers: h, url: r.url }};
- }}).catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
- """
- return BrowserResponse(self.page.run_js(js, timeout=60)) # 文件上传可能较慢,给60s
- def _solve_recaptcha(self, params) -> str:
- """
- 调用 YesCaptcha API 识别
- """
- client_key = params.get("apiToken")
-
- # 1. 选择任务类型
- # 根据文档:RecaptchaV3TaskProxylessM1S7 强制 0.7 分,适合登录
- task_type = "RecaptchaV3TaskProxyless" # 默认
- if params.get("minScore") == 0.7:
- task_type = "RecaptchaV3TaskProxylessM1S7"
- elif params.get("minScore") == 0.9:
- task_type = "RecaptchaV3TaskProxylessM1S9"
-
- # 2. 构造创建任务请求
- create_url = "https://api.yescaptcha.com/createTask"
- create_data = {
- "clientKey": client_key,
- "task": {
- "type": task_type,
- "websiteURL": params.get("page"),
- "websiteKey": params.get("siteKey"),
- "pageAction": params.get("action") # YesCaptcha 要求的字段名是 pageAction
- }
- }
-
- import requests as req
- try:
- # 发送创建任务请求
- r = req.post(create_url, json=create_data, timeout=20)
- if r.status_code != 200:
- raise BizLogicError(f"YesCaptcha Create Failed: {r.text}")
-
- res_json = r.json()
- if res_json.get("errorId") != 0:
- raise BizLogicError(f"YesCaptcha Error: {res_json.get('errorDescription')}")
-
- task_id = res_json.get("taskId")
- if not task_id:
- raise BizLogicError("YesCaptcha returned no taskId")
-
- # 3. 轮询获取结果
- result_url = "https://api.yescaptcha.com/getTaskResult"
- for _ in range(30): # 最多等 60-90秒
- time.sleep(3)
-
- r = req.post(result_url, json={"clientKey": client_key, "taskId": task_id}, timeout=20)
- d = r.json()
-
- # 识别中
- if d.get("status") == "processing":
- continue
-
- # 识别成功
- if d.get("status") == "ready":
- solution = d.get("solution", {})
- token = solution.get("gRecaptchaResponse")
- if token:
- return token
- else:
- raise BizLogicError("YesCaptcha ready but no token found")
-
- # 识别失败
- if d.get("errorId") != 0:
- raise BizLogicError(f"YesCaptcha Task Failed: {d.get('errorDescription')}")
-
- except Exception as e:
- raise BizLogicError(f"Captcha Solver Exception: {e}")
-
- raise BizLogicError("YesCaptcha timeout")
- def _parse_valid_days(self, text):
- # 提取 DateLibere (YYYY-MM-DD)
- # 格式: {"DateLibere":"22/10/2024 00:00:00","SlotLiberi":1,"SlotRimanenti":1}
- # 原始正则: r'{"DateLibere":"(.*?)","SlotLiberi":\d+,"SlotRimanenti":(-?\d+)}'
- days = []
- try:
- matches = re.findall(r'{"DateLibere":"(.*?)".*?"SlotRimanenti":(-?\d+)}', text)
- for d_str, rem in matches:
- if int(rem) != -1:
- # 22/10/2024 -> 2024-10-22
- dt = datetime.strptime(d_str[:10], "%d/%m/%Y")
- days.append(dt.strftime("%Y-%m-%d"))
- except: pass
- return days
- def _parse_time_slots(self, text):
- # 提取 IDCalendarioServizioGiornaliero, StartTime, EndTime, Remain
- slots = []
- try:
- # 原始逻辑比较复杂,这里简化正则
- # 查找 SlotRimanenti > 0 的记录
- # 关键是 IDCalendarioServizioGiornaliero
- raw_list = json.loads(text)
- # Prenotami 返回的是一个 JSON 列表字符串
- for item in raw_list:
- remain = item.get('SlotRimanenti', -1)
- if remain > 0:
- start = item['OrarioInizioFascia']
- end = item['OrarioFineFascia']
- s_time = f"{start['Hours']:02d}:{start['Minutes']:02d}"
- e_time = f"{end['Hours']:02d}:{end['Minutes']:02d}"
- slots.append({
- 'id': item['IDCalendarioServizioGiornaliero'],
- 'start': s_time,
- 'end': e_time,
- 'remain': remain
- })
- except: pass
- return slots
-
- # --- 资源清理核心方法 ---
- def cleanup(self):
- """
- 销毁浏览器并彻底删除临时文件
- """
- # 1. 关闭浏览器
- if self.page:
- try:
- self.page.quit() # 这会关闭 Chrome 进程
- except Exception:
- pass # 忽略已关闭的错误
- self.page = None
-
- # 2. 删除文件
- # 注意:Chrome 关闭后可能需要几百毫秒释放文件锁,稍微等待
- if os.path.exists(self.root_workspace):
- for _ in range(3):
- try:
- time.sleep(0.2)
- shutil.rmtree(self.root_workspace, ignore_errors=True)
- break
- except Exception as e:
- # 如果删除失败(通常是Windows文件占用),重试
- self._log(f"Cleanup retry: {e}")
- time.sleep(0.5)
-
- # 如果依然存在,打印警告(虽然 ignore_errors=True 会掩盖报错,但可以 check exists)
- if os.path.exists(self.root_workspace):
- self._log(f"[WARN] Failed to fully remove workspace: {self.root_workspace}")
- # 3. [新增] 关闭代理隧道
- if self.tunnel:
- try: self.tunnel.stop()
- except: pass
- self.tunnel = None
-
- def __del__(self):
- """
- 析构函数:当对象被垃圾回收时自动调用
- """
- self.cleanup()
|