| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764 |
- import time
- import json
- import random
- import re
- import os
- import uuid
- import shutil
- from datetime import datetime
- from typing import List, Dict, Optional, Any, Callable
- from urllib.parse import urljoin, urlparse, urlencode
- # DrissionPage 核心
- from DrissionPage import ChromiumPage, ChromiumOptions
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from utils.cloudflare_bypass_for_scraping import CloudflareBypasser
- from toolkit.proxy_tunnel import ProxyTunnel
- class BrowserResponse:
- """模拟 requests.Response"""
- def __init__(self, result_dict):
- result_dict = result_dict or {}
- self.status_code = result_dict.get('status', 0)
- self.text = result_dict.get('body', '')
- self.headers = result_dict.get('headers', {})
- self.url = result_dict.get('url', '')
- self._json = None
- def json(self):
- if self._json is None:
- if not self.text:
- return {}
- try:
- self._json = json.loads(self.text)
- except:
- self._json = {}
- return self._json
- class TlsPlugin2(IVSPlg):
- """
- TLSContact 签证预约插件 (DrissionPage 版)
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.is_healthy = True
- self.logger = None
-
- # 浏览器实例
- self.page: Optional[ChromiumPage] = None
-
- self.travel_group: Optional[Dict] = None
-
- # --- [核心修改] 并发隔离与资源管理 ---
- # 生成唯一实例 ID
- self.instance_id = uuid.uuid4().hex[:8]
- self.root_workspace = os.path.abspath(os.path.join("temp_browser_data", f"{self.group_id}_{self.instance_id}"))
- # 定义子目录:代理插件目录 & 浏览器用户数据目录
- self.user_data_path = os.path.join(self.root_workspace, "user_data")
-
- # 确保根目录存在 (子目录由具体逻辑创建)
- if not os.path.exists(self.root_workspace):
- os.makedirs(self.root_workspace)
-
- # 持有隧道实例
- self.tunnel = None
-
- self.session_create_time: float = 0
- def get_group_id(self) -> str:
- return self.group_id
-
- def set_log(self, logger: Callable[[str], None]) -> None:
- self.logger = logger
-
- def _log(self, message):
- if self.logger:
- self.logger(f'[TlsPlugin] [{self.group_id}] {message}')
- else:
- print(f'[TlsPlugin] [{self.group_id}] {message}')
- def set_config(self, config: VSPlgConfig):
- self.config = config
- self.free_config = config.free_config or {}
- def health_check(self) -> bool:
- if not self.is_healthy:
- return False
- if self.page is None:
- return False
- try:
- if not self.page.run_js("return 1;"):
- return False
- except:
- return False
-
- if self.config.session_max_life > 0:
- current_time = time.time()
- elapsed_time = current_time - self.session_create_time
- if elapsed_time > self.config.session_max_life * 60:
- self._log(f"Session expired.")
- return False
- return True
-
- def _save_screenshot(self, name_prefix):
- try:
- timestamp = int(time.time())
- filename = f"{self.instance_id}_{name_prefix}_{timestamp}.jpg"
- save_path = os.path.join("data", filename)
- os.makedirs("data", exist_ok=True)
-
- # [修改] 改为 full_page=False,防止页面结构异常导致截图失败
- # 这样能截取到浏览器当前可视区域,最适合调试“卡住”的情况
- self.page.get_screenshot(path=save_path, full_page=False)
-
- self._log(f"Screenshot saved to {save_path}")
- except Exception as e:
- self._log(f"Failed to save screenshot: {e}")
- def create_session(self):
- """
- 全浏览器会话创建:过盾 -> JS注入登录 -> 原生跳转
- """
- self._log(f"Initializing Session (ID: {self.instance_id})...")
- co = ChromiumOptions()
- # -------------------------------------------------------------
- # [核心修复] 解决 'not enough values to unpack'
- # -------------------------------------------------------------
- # 1. 不要用 co.auto_port(),因为它依赖解析 stdout,会被 DBus 报错干扰
- # 2. 我们手动随机生成一个端口
- import random
- import socket
-
- def get_free_port():
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('', 0))
- return s.getsockname()[1]
-
- debug_port = get_free_port()
- self._log(f"Assigned Debug Port: {debug_port}")
-
- # 3. 强制指定端口,DrissionPage 就会直接连接,不再解析日志
- co.set_local_port(debug_port)
-
- # --- [关键配置] 设置独立的用户数据目录 ---
- # 这样每个实例的 Cache, Cookies, LocalStorage 都是完全隔离的
- # 同时也防止了多进程争抢同一个 Default 文件夹导致的崩溃
- co.set_user_data_path(self.user_data_path)
-
- # --- 1. 指定浏览器路径 (适配 Docker) ---
- chrome_path = os.getenv("CHROME_BIN")
- if chrome_path and os.path.exists(chrome_path):
- co.set_paths(browser_path=chrome_path)
-
- # --- [核心修改] 代理配置 ---
- if self.config.proxy and self.config.proxy.ip:
- p = self.config.proxy
-
- if p.username and p.password:
- self._log(f"Starting Proxy Tunnel for {p.ip}...")
-
- # 1. 启动本地隧道
- self.tunnel = ProxyTunnel(p.ip, p.port, p.username, p.password)
- local_proxy = self.tunnel.start()
-
- self._log(f"Tunnel started at {local_proxy}")
-
- # 2. Chrome 连接本地免密端口
- # 必须使用 --proxy-server 强制指定,绝对稳健
- co.set_argument(f'--proxy-server={local_proxy}')
-
- else:
- # 无密码代理,直接用
- proxy_str = f"{p.scheme}://{p.ip}:{p.port}"
- co.set_argument(f'--proxy-server={proxy_str}')
- else:
- self._log("[WARN] No proxy configured!")
- co.headless(False)
- co.set_argument('--no-sandbox')
- co.set_argument('--disable-gpu')
- # Docker 默认 /dev/shm 只有 64MB,Chromium 很容易爆内存崩溃
- co.set_argument('--disable-dev-shm-usage')
- co.set_argument('--window-size=1920,1080')
- co.set_argument('--disable-blink-features=AutomationControlled')
- try:
- self.page = ChromiumPage(co)
-
- apt_config = self.free_config.get('apt_config', {})
- if not apt_config:
- raise NotFoundError("apt_config config missing")
- login_url = "https://visas-fr.tlscontact.com/en-us/login"
- params = {
- "issuerId": apt_config["code"],
- "country": apt_config["country"],
- "vac": apt_config["code"],
- "redirect": f"/en-us/country/{apt_config['country']}/vac/{apt_config['code']}"
- }
- full_login_url = f"{login_url}?{urlencode(params)}"
-
- self._log(f"Navigating: {full_login_url}")
- self.page.get(full_login_url)
-
- # --- Cloudflare 过盾 ---
- cf = CloudflareBypasser(self.page, log=self.config.debug)
- if not cf.bypass(max_retry=15):
- raise BizLogicError("Cloudflare bypass timeout")
- # --- 登录页面检查 ---
- if not self.page.ele('#email-input-field'):
- self._log("Reloading Login Page...")
- self.page.get(full_login_url)
- if not self.page.wait.ele_displayed('#email-input-field', timeout=15):
- self._save_screenshot("login_load_fail")
- raise BizLogicError("Login form not loaded")
- # --- JS 注入登录 ---
- g_token = ""
- if self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'):
- self._log("Solving ReCaptcha...")
- rc_params = {
- "type": "ReCaptchaV2TaskProxyLess", "page": self.page.url,
- "siteKey": "6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0",
- "apiToken": self.free_config.get("capsolver_key", "")
- }
- g_token = self._solve_recaptcha(rc_params)
- username = self.config.account.username
- password = self.config.account.password
-
- # 使用 JS 直接操作 DOM 并 click,让浏览器处理 302
- js_login = f"""
- var u = document.getElementById('email-input-field');
- if(u) {{ u.value = "{username}"; u.dispatchEvent(new Event('input', {{bubbles:true}})); }}
-
- var p = document.getElementById('password-input-field');
- if(p) {{ p.value = "{password}"; p.dispatchEvent(new Event('input', {{bubbles:true}})); }}
-
- var g = document.getElementById('g-recaptcha-response');
- if(g) {{ g.value = "{g_token}"; }}
-
- var btn = document.getElementById('btn-login');
- if(btn) {{ btn.click(); return true; }} else {{ return false; }}
- """
-
- self._log("Submitting Login via JS...")
- if not self.page.run_js(js_login): raise BizLogicError("Login button missing")
- # --- 等待跳转 ---
- self._log("Waiting for redirect...")
- self.page.wait.url_change('login-actions', exclude=True, timeout=45)
-
- # 检查是否失败
- if "login-actions" in self.page.url or "auth" in self.page.url:
- err = "Unknown Login Error"
- if "Invalid username" in self.page.html: err = "Invalid Credentials"
- self._save_screenshot("login_submit_fail")
- raise BizLogicError(f"Login Failed: {err}")
- # --- 提取 Dashboard 信息 ---
- self._log("Waiting for dashboard...")
- self.page.wait.load_start()
- time.sleep(5)
-
- html = self.page.html
- self._check_page_is_session_expired_or_invalid("My travel group", html)
- groups = self._parse_travel_groups(html)
-
- target_city = apt_config['city'].lower()
- for g in groups:
- if g['location'].lower() == target_city:
- self.travel_group = g
- break
-
- if not self.travel_group:
- self._save_screenshot("group_not_found")
- raise NotFoundError(f"Group not found for {target_city}")
-
- self.session_create_time = time.time()
- self._log(f"Session Ready. Group: {self.travel_group['group_number']}")
- except Exception as e:
- self._log(f"Session Create Error: {e}")
- self.cleanup()
- raise e
- def query(self, apt_type: AppointmentType) -> VSQueryResult:
- res = VSQueryResult()
- res.success = False
- apt_config = self.free_config.get('apt_config', {})
- group_num = self.travel_group['group_number']
- interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y"))
-
- url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking'
- params = {
- 'location': apt_config["code"],
- 'month': interest_month,
- }
-
- # DrissionPage 自动处理 Cloudflare,直接 fetch 即可
- try:
- resp = self._perform_request("GET", url, params=params, retry_count=1)
- except Exception as e:
- self._log(f"Query request failed: {e}")
- raise e
- self._check_page_is_session_expired_or_invalid('Book your appointment', resp.text)
- # 解析 Slots
- all_slots = self._parse_appointment_slots(resp.text)
- target_labels = self.free_config.get("target_labels", ["", "pta"])
- # 根据配置过滤
- available = [s for s in all_slots if s.get("label") in target_labels]
-
- if available:
- res.success = True
- earliest_date = available[0]["date"]
- earliest_dt = datetime.strptime(earliest_date, "%Y-%m-%d")
- res.availability_status = AvailabilityStatus.Available
- res.earliest_date = earliest_dt
- date_map: dict[datetime, list[TimeSlot]] = {}
- for s in available:
- date_str = s["date"]
- dt = datetime.strptime(date_str, "%Y-%m-%d")
- date_map.setdefault(dt, []).append(
- TimeSlot(time=s["time"], label=str(s.get("label", "")))
- )
- res.availability = [DateAvailability(date=d, times=slots) for d, slots in date_map.items()]
- else:
- res.success = False
- res.availability_status = AvailabilityStatus.NoneAvailable
- return res
- def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult:
- res = VSBookResult()
- res.success = False
-
- apt_config = self.free_config.get('apt_config', {})
- group_num = self.travel_group['group_number']
-
- available_dates = [da.date for da in slot_info.availability]
- exp_start = user_inputs.get('expected_start_date', '')
- exp_end = user_inputs.get('expected_end_date', '')
- support_pta = user_inputs.get('support_pta', True)
- target_labels = ['']
- if support_pta:
- target_labels.append('pta')
- available_dates_str = [
- da.date.strftime("%Y-%m-%d")
- for da in slot_info.availability
- ]
- valid_dates = self._filter_dates(available_dates_str, exp_start, exp_end)
- if not valid_dates:
- raise NotFoundError(message="No dates match user constraints")
-
- selected_date = None
- selected_time = None
- selected_label = None
-
- for d in valid_dates:
- for da in slot_info.availability:
- if da.date == d:
- for t in da.times:
- if t.label in target_labels:
- selected_date = d
- selected_time = t
- selected_label = t.label
- break
- if selected_date: break
-
- if not selected_date:
- raise NotFoundError(message="No suitable slot found")
- # 2. 解决 ReCaptcha V3 (Action: book)
- page_url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking?location={apt_config["code"]}&month={selected_date[:7]}'
-
- api_token = self.free_config.get("capsolver_key", "")
- rc_params = {
- "type": "ReCaptchaV3Task",
- "page": page_url,
- "action": "book",
- "siteKey": "6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9",
- "apiToken": api_token,
- "proxy": self._get_proxy_url() # ProxyLess
- }
- g_token = self._solve_recaptcha(rc_params)
- # 3. 构造 Next.js Payload
- # 注意:在 JS 中构造 FormData 比在 Python 中拼 Multipart 更容易且不易出错
- ACTION_ID = "60d0616946df1fc4e7c094ca6a7a04f134d0be3d53"
- url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking'
-
- # State Tree 字符串
- router_state = '%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22'+str(group_num)+'%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D'
- # 构造 JS 代码执行 fetch
- # 使用 FormData 对象来处理 multipart
- js_script = f"""
- const url = "{url}";
- const formData = new FormData();
-
- formData.append('1_formGroupId', '{group_num}');
- formData.append('1_lang', 'en-us');
- formData.append('1_process', 'APPOINTMENT');
- formData.append('1_location', '{apt_config["code"]}');
- formData.append('1_date', '{selected_date}');
- formData.append('1_time', '{selected_time.time}');
- formData.append('1_appointmentLabel', '{selected_label}');
- formData.append('1_captcha_token', '{g_token}');
- formData.append('0', '[{{"status":"IDLE"}},"$K1"]');
-
- const headers = {{
- 'Next-Action': '{ACTION_ID}',
- 'Next-Router-State-Tree': decodeURIComponent('{router_state}'),
- 'Accept': 'text/x-component'
- }};
-
- return fetch(url, {{
- method: 'POST',
- headers: headers,
- body: formData
- }}).then(async response => {{
- const text = await response.text();
- const headers = {{}};
- response.headers.forEach((value, key) => headers[key] = value);
- return {{
- status: response.status,
- body: text,
- headers: headers,
- url: response.url
- }};
- }}).catch(err => {{
- return {{ status: 0, body: err.toString(), headers: {{}}, url: url }};
- }});
- """
-
- self._log("Submitting booking request via JS Fetch...")
- res_dict = self.page.run_js(js_script)
- resp = BrowserResponse(res_dict)
- # 4. 结果判定
- # Next.js Server Action 重定向通常是 303,但 fetch 可能会自动跟随
- # 如果 fetch 跟随了,url 会变;如果没跟随(Redirect mode: manual),status 是 303
-
- if resp.status_code == 303 or (resp.status_code == 200 and "appointment-confirmation" in resp.url):
- self._log(f"Booking Success! URL: {resp.url}")
- res.success = True
- res.book_date = selected_date
- res.book_time = selected_time
- return res
- if resp.status_code == 200:
- if "APPOINTMENT_LIMIT_REACHED" in resp.text:
- self._log("Failed: Appointment Limit Reached")
- elif "Invalid captcha" in resp.text:
- self._log("Failed: Invalid Captcha")
- else:
- self._log(f"Booking Failed (Unknown 200): {resp.text[:200]}")
- else:
- self._log(f"Booking Failed. Status: {resp.status_code}")
- return res
- # --- 辅助方法 ---
-
- def _get_proxy_url(self):
- # 构造代理
- proxy_url = ""
- if self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- proxy_url = f"{s.scheme}://{s.ip}:{s.port}"
- return proxy_url
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0):
- """
- 在浏览器上下文中注入 JS 执行 Fetch
- """
- if not self.page:
- raise BizLogicError("Browser not initialized")
- if params:
- from urllib.parse import urlencode
- if '?' in url:
- url += '&' + urlencode(params)
- else:
- url += '?' + urlencode(params)
- fetch_options = {
- "method": method.upper(),
- "headers": headers or {},
- "credentials": "include"
- }
- # Body 处理
- if json_data:
- fetch_options['body'] = json.dumps(json_data)
- fetch_options['headers']['Content-Type'] = 'application/json'
- elif data:
- if isinstance(data, dict):
- from urllib.parse import urlencode
- fetch_options['body'] = urlencode(data)
- fetch_options['headers']['Content-Type'] = 'application/x-www-form-urlencoded'
- else:
- fetch_options['body'] = data
- js_script = f"""
- const url = "{url}";
- const options = {json.dumps(fetch_options)};
-
- return fetch(url, options)
- .then(async response => {{
- const text = await response.text();
- const headers = {{}};
- response.headers.forEach((value, key) => headers[key] = value);
-
- return {{
- status: response.status,
- body: text,
- headers: headers,
- url: response.url
- }};
- }})
- .catch(error => {{
- return {{
- status: 0,
- body: error.toString(),
- headers: {{}},
- url: url
- }};
- }});
- """
-
- res_dict = self.page.run_js(js_script, timeout=30)
- resp = BrowserResponse(res_dict)
-
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 401:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- elif resp.status_code == 403:
- # [关键修改] 遇到 403 Forbidden,尝试绕盾并重试
- # 最多重试 2 次
- if retry_count < 2:
- self._log(f"HTTP 403 Detected. Cloudflare session expired? Attempting refresh (Try {retry_count+1}/2)...")
-
- # 尝试刷新盾
- if self._refresh_firewall_session():
- self._log("Firewall session refreshed. Retrying request...")
- # 递归重试
- return self._perform_request(method, url, headers, data, json_data, params, retry_count+1)
- else:
- self._log("Failed to refresh firewall session.")
-
- # 如果重试失败,抛出异常
- raise PermissionDeniedError(f"HTTP 403: {resp.text[:100]}")
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError()
- else:
- # 如果是 0,可能是 fetch 报错
- if resp.status_code == 0:
- raise BizLogicError(f"Network Error: {resp.text}")
- # TLS 业务错误
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
-
- def _refresh_firewall_session(self) -> bool:
- """
- 主动刷新页面以触发 Cloudflare 挑战并尝试通过
- """
- try:
- # 1. 刷新当前页面 (通常 Dashboard 页)
- # 这会强制浏览器重新进行 HTTP 请求,从而触发 Cloudflare 拦截页
- self._log("Refreshing page to trigger Cloudflare...")
- self.page.refresh()
-
- # 2. 调用 CloudflareBypasser
- cf = CloudflareBypasser(self.page, log=self.config.debug)
-
- # 3. 尝试过盾 (尝试次数稍多一点,因为此时可能网络不稳定)
- success = cf.bypass(max_retry=10)
-
- if success:
- # 再次确认页面是否正常加载 (非 403 页面)
- title = self.page.title.lower()
- if "access denied" in title:
- return False
-
- # 等待 DOM 稍微稳定
- time.sleep(2)
- return True
-
- return False
- except Exception as e:
- self._log(f"Error during firewall refresh: {e}")
- return False
- def _solve_recaptcha(self, params) -> str:
- """调用 VSCloudApi 解决 ReCaptcha"""
- key = params.get("apiToken")
- if not key: raise NotFoundError("Api-token required")
-
- submit_url = "https://api.capsolver.com/createTask"
- task = {
- "type": params.get("type"),
- "websiteURL": params.get("page"),
- "websiteKey": params.get("siteKey"),
- }
- if params.get("action"):
- task["pageAction"] = params.get("action")
-
- if params.get("proxy"):
- p = urlparse(params.get("proxy"))
- task["proxyType"] = p.scheme
- task["proxyAddress"] = p.hostname
- task["proxyPort"] = p.port
- if p.username:
- task["proxyLogin"] = p.username
- task["proxyPassword"] = p.password
-
- # 注意:使用 DrissionPage 后,通常是 ProxyLess 模式
- # 除非你想让 Capsolver 也用同样的代理(通常不需要,除非风控极严)
-
- payload = {"clientKey": key, "task": task}
- import requests as req # 局部引用,避免混淆
- r = req.post(submit_url, json=payload, timeout=20)
- if r.status_code != 200:
- raise BizLogicError(message="Failed to submit capsolver task")
-
- task_id = r.json().get("taskId")
- for _ in range(20):
- r = req.post("https://api.capsolver.com/getTaskResult", json={"clientKey": key, "taskId": task_id}, timeout=20)
- if r.status_code == 200:
- d = r.json()
- if d.get("status") == "ready":
- return d["solution"]["gRecaptchaResponse"]
- time.sleep(3)
- raise BizLogicError(message="Capsolver task timeout")
- def _parse_travel_groups(self, html: str) -> List[Dict]:
- groups = []
- js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups'
- js_match = re.search(js_pattern, html, re.DOTALL)
- if js_match:
- json_str = js_match.group(1).replace(r'\"', '"')
- data = json.loads(json_str)
- for g in data:
- groups.append({
- 'group_name': g.get('groupName'),
- 'group_number': g.get('formGroupId'),
- 'location': g.get('vacName')
- })
- else:
- self._log('Parsed travel group page, but not found travelGroups')
- return groups
- def _parse_appointment_slots(self, html: str) -> List[Dict]:
- slots = []
- pattern = r'"availableAppointments\\":\s*(\[.*\]),\\"showFlexiAppointment'
- match = re.search(pattern, html, re.DOTALL)
-
- if match:
- json_str = match.group(1).replace(r'\"', '"')
- data = json.loads(json_str)
- for day in data:
- d_str = day.get('day')
- for s in day.get('slots', []):
- labels = s.get('labels', [])
- lbl = ""
- # 简化逻辑:TLS label 列表
- if 'pta' in labels: lbl = 'pta'
- elif 'ptaw' in labels: lbl = 'ptaw'
- elif '' in labels or not labels: lbl = ''
-
- slots.append({
- 'date': d_str,
- 'time': s.get('time'),
- 'label': lbl
- })
- return slots
-
- def _check_page_is_session_expired_or_invalid(self, keyword, html: str) -> bool:
- if not html:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
-
- # 将 html 转小写检查
- html_lower = html.lower()
- if keyword.lower() not in html_lower:
- if 'redirected automatically' in html_lower:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError("Redirected automatically")
- if 'login' in html_lower and 'password' in html_lower:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError("Redirected to login")
- if 'session expired' in html_lower:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError("Session expired")
-
- def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]:
- if not start_str or not end_str:
- return dates
- valid_dates = []
- s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
- e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
- for date_str in dates:
- curr_date = datetime.strptime(date_str, "%Y-%m-%d")
- if s_date <= curr_date <= e_date:
- valid_dates.append(date_str)
- random.shuffle(valid_dates)
- return valid_dates
-
- # --- 资源清理核心方法 ---
- def cleanup(self):
- """
- 销毁浏览器并彻底删除临时文件
- """
- # 1. 关闭浏览器
- if self.page:
- try:
- self.page.quit() # 这会关闭 Chrome 进程
- except Exception:
- pass # 忽略已关闭的错误
- self.page = None
-
- # 2. 删除文件
- # 注意:Chrome 关闭后可能需要几百毫秒释放文件锁,稍微等待
- if os.path.exists(self.root_workspace):
- for _ in range(3):
- try:
- time.sleep(0.2)
- shutil.rmtree(self.root_workspace, ignore_errors=True)
- break
- except Exception as e:
- # 如果删除失败(通常是Windows文件占用),重试
- self._log(f"Cleanup retry: {e}")
- time.sleep(0.5)
-
- # 如果依然存在,打印警告(虽然 ignore_errors=True 会掩盖报错,但可以 check exists)
- if os.path.exists(self.root_workspace):
- self._log(f"[WARN] Failed to fully remove workspace: {self.root_workspace}")
- # 3. [新增] 关闭代理隧道
- if self.tunnel:
- try: self.tunnel.stop()
- except: pass
- self.tunnel = None
-
- def __del__(self):
- """
- 析构函数:当对象被垃圾回收时自动调用
- """
- self.cleanup()
|