import time import json import random import re import os import uuid import shutil import socket from datetime import datetime from typing import List, Dict, Optional, Any, Callable from urllib.parse import urljoin, urlparse, urlencode # DrissionPage 核心 from DrissionPage import ChromiumPage, ChromiumOptions from vs_plg import IVSPlg from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from utils.cloudflare_bypass_for_scraping import CloudflareBypasser from toolkit.proxy_tunnel import ProxyTunnel from utils.mouse import HumanMouse from utils.keyboard import HumanKeyboard from utils.fingerprint_utils import FingerprintGenerator class BrowserResponse: """模拟 requests.Response""" def __init__(self, result_dict): result_dict = result_dict or {} self.status_code = result_dict.get('status', 0) self.text = result_dict.get('body', '') self.headers = result_dict.get('headers', {}) self.url = result_dict.get('url', '') self._json = None def json(self): if self._json is None: if not self.text: return {} try: self._json = json.loads(self.text) except: self._json = {} return self._json class TlsPlugin(IVSPlg): """ TLSContact 签证预约插件 (DrissionPage 版) """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.is_healthy = True self.logger = None self.mouse = None self.keyboard = None self.page: Optional[ChromiumPage] = None self.travel_group: Optional[Dict] = None self.instance_id = uuid.uuid4().hex[:8] self.root_workspace = os.path.abspath(os.path.join("data/temp_browser_data", f"{self.group_id}.{self.instance_id}")) self.user_data_path = os.path.join(self.root_workspace, "user_data") if not os.path.exists(self.root_workspace): os.makedirs(self.root_workspace) self.tunnel = None self.session_create_time: float = 0 def get_group_id(self) -> str: return self.group_id def set_log(self, logger: Callable[[str], None]): self.logger = logger def _log(self, message): if self.logger: self.logger(f'[TlsPlugin] [{self.group_id}] {message}') else: print(f'[TlsPlugin] [{self.group_id}] {message}') def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} def keep_alive(self): try: resp = self._perform_request("GET", self.page.url, retry_count=1) self._check_page_is_session_expired_or_invalid('Book your appointment', html = resp.text) except SessionExpiredOrInvalidError as e: self.is_healthy = False except Exception as e: pass def health_check(self) -> bool: if not self.is_healthy: return False if self.page is None: return False try: if not self.page.run_js("return 1;"): return False except: return False if self.config.session_max_life > 0: current_time = time.time() elapsed_time = current_time - self.session_create_time if elapsed_time > self.config.session_max_life * 60: self._log(f"Session expired.") return False return True def _save_screenshot(self, name_prefix): try: timestamp = int(time.time()) filename = f"{self.instance_id}_{name_prefix}_{timestamp}.jpg" save_path = os.path.join("data", filename) os.makedirs("data", exist_ok=True) self.page.get_screenshot(path=save_path, full_page=False) self._log(f"Screenshot saved to {save_path}") except Exception as e: self._log(f"Failed to save screenshot: {e}") def create_session(self): """ 全浏览器会话创建:过盾 -> JS注入登录 -> 原生跳转 """ self._log(f"Initializing Session (ID: {self.instance_id})...") co = ChromiumOptions() def get_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) return s.getsockname()[1] debug_port = get_free_port() self._log(f"Assigned Debug Port: {debug_port}") co.set_local_port(debug_port) co.set_user_data_path(self.user_data_path) chrome_path = os.getenv("CHROME_BIN") if chrome_path and os.path.exists(chrome_path): co.set_paths(browser_path=chrome_path) if self.config.proxy and self.config.proxy.ip: p = self.config.proxy if p.username and p.password: self._log(f"Starting Proxy Tunnel for {p.ip}...") self.tunnel = ProxyTunnel(p.ip, p.port, p.username, p.password) local_proxy = self.tunnel.start() self._log(f"Tunnel started at {local_proxy}") co.set_argument(f'--proxy-server={local_proxy}') else: proxy_str = f"{p.scheme}://{p.ip}:{p.port}" co.set_argument(f'--proxy-server={proxy_str}') else: self._log("[WARN] No proxy configured!") fingerprint_gen = FingerprintGenerator() specific_fp = fingerprint_gen.generate(self.config.account.username) self._log(f'browser fingerprint={specific_fp}') co.headless(False) co.set_argument('--no-sandbox') # co.set_argument('--disable-gpu') co.set_argument('--disable-dev-shm-usage') co.set_argument('--window-size=1920,1080') co.set_argument('--disable-blink-features=AutomationControlled') co.set_argument(f"--fingerprint={specific_fp.get('seed')}") co.set_argument(f"--fingerprint-platform={specific_fp.get('platform')}") co.set_argument(f"--fingerprint-brand={specific_fp.get('brand')}") try: self.page = ChromiumPage(co) if self.config.debug: self.page.get('https://example.com') js_script = """ function getFingerprint() { let webglVendor = 'Unknown'; let webglRenderer = 'Unknown'; try { let canvas = document.createElement('canvas'); let gl = canvas.getContext('webgl') || canvas.getContext('experimental-webgl'); if (gl) { let debugInfo = gl.getExtension('WEBGL_debug_renderer_info'); if (debugInfo) { webglVendor = gl.getParameter(debugInfo.UNMASKED_VENDOR_WEBGL); webglRenderer = gl.getParameter(debugInfo.UNMASKED_RENDERER_WEBGL); } } } catch(e) {} return { "User-Agent": navigator.userAgent, "Platform": navigator.userAgentData ? navigator.userAgentData.platform : navigator.platform, "Brands": navigator.userAgentData ? navigator.userAgentData.brands.map(b => b.brand).join(', ') : 'Not Supported', "CPU Cores": navigator.hardwareConcurrency, "Language": navigator.language, "Timezone": Intl.DateTimeFormat().resolvedOptions().timeZone, "WebGL Vendor": webglVendor, "WebGL Renderer": webglRenderer }; } return getFingerprint(); """ fp_data = self.page.run_js(js_script) self._log("================ 预检浏览器指纹数据 ================") self._log(json.dumps(fp_data, indent=4, ensure_ascii=False)) self._log("====================================================") tls_url = self.free_config.get('tls_url', '') self._log(f"Navigating: {tls_url}") self.page.get(tls_url) time.sleep(5) cf_bypasser = CloudflareBypasser(self.page, log=True) if not cf_bypasser.bypass(max_retry=15): raise BizLogicError("Cloudflare bypass timeout") time.sleep(3) cf_bypasser.handle_waiting_room() self._log("Init humanize tools...") self.mouse = HumanMouse(self.page, debug=True) self.keyboard = HumanKeyboard(self.page) self._log("Random mouse start position...") viewport_width = self.page.rect.viewport_size[0] viewport_height = self.page.rect.viewport_size[1] init_x = random.randint(10, viewport_width - 10) init_y = random.randint(10, viewport_height - 10) self.mouse.move(init_x, init_y) btn_selector = 'tag:button@@text():Login' if not self.page.wait.ele_displayed(btn_selector, timeout=3): login_btn = self.page.ele("tag:a@@href:login") self.mouse.human_click_ele(login_btn) time.sleep(3) if not self.page.wait.ele_displayed(btn_selector, timeout=10): raise BizLogicError(message=f"Can't find selector={btn_selector}") time.sleep(random.uniform(0.5, 1)) # recaptchav2_token = "" # if self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'): # self._log("Solving ReCaptcha...") # rc_params = { # "type": "ReCaptchaV2TaskProxyLess", # "page": self.page.url, # "siteKey": "6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0", # "apiToken": self.free_config.get("capsolver_key", "") # } # recaptchav2_token = self._solve_recaptcha(rc_params) username = self.config.account.username password = self.config.account.password input_ele = self.page.ele('tag:label@@text():Email').next() self.mouse.human_click_ele(input_ele) time.sleep(random.uniform(0.2, 0.6)) self.keyboard.type_text(username, humanize=True) time.sleep(random.uniform(0.5, 1.2)) input_ele = self.page.ele('tag:label@@text():Password').next() self.mouse.human_click_ele(input_ele) time.sleep(random.uniform(0.2, 0.6)) self.keyboard.type_text(password, humanize=True) # if recaptchav2_token: # inject_recaptchav2_token_js = f""" # var g = document.getElementById('g-recaptcha-response'); # if(g) {{ g.value = "{recaptchav2_token}"; }} # """ # self._log("Inject ReCaptchaV2 Token via JS...") # self.page.run_js(inject_recaptchav2_token_js) # time.sleep(random.uniform(0.5, 1.0)) self._log("Submitting Login...") time.sleep(random.uniform(0.3, 0.8)) login_btn = self.page.ele('tag:button@@text():Login') self.mouse.human_click_ele(login_btn) self._log("Waiting for redirect...") self.page.wait.url_change('login-actions', exclude=True, timeout=45) time.sleep(3) if "login-actions" in self.page.url or "auth" in self.page.url: raise BizLogicError(message="Login Failed! Invalid credentials or Captcha rejected.") self.page.wait.load_start() time.sleep(5) # groups = self._parse_travel_groups(self.page.html) # location = self.free_config.get('location') # for g in groups: # if g['location'] == location: # self.travel_group = g # break # if not self.travel_group: # self._save_screenshot("group_not_found") # raise NotFoundError(f"Group not found for {location}") # formgroup_id = self.travel_group.get('group_number') # btn_selector = f'tag:button@@name=formGroupId@@value={formgroup_id}' # self._log(f"Waiting for visible button to render: {formgroup_id}...") # self.page.wait.eles_loaded(btn_selector, timeout=15) # buttons = self.page.eles(btn_selector) # select_btn = None # for btn in reversed(buttons): # try: # w, h = btn.rect.size # if w > 0 and h > 0: # select_btn = btn # break # except Exception: # continue # if not select_btn: # self._save_screenshot("visible_button_not_found") # raise BizLogicError(f"Can't find any visible Select button for group {formgroup_id}") # time.sleep(random.uniform(0.5, 1.2)) # self.mouse.human_click_ele(select_btn) # self._log("Waiting for url redirect...") # self.page.wait.url_change('travel-groups', exclude=True, timeout=45) # time.sleep(2) # if "travel-groups" in self.page.url or "auth" in self.page.url: # raise BizLogicError(message="Redirect to service-level Failed!") # no_applicant_indicators = [ # "Add a new applicant" in self.page.html, # "You have not yet added an applicant. Please click the button below to add one." in self.page.html, # "applicants-information" in self.page.url # ] # if any(no_applicant_indicators): # raise BizLogicError(message=f"No applicant added") btn_selector = '#book-appointment-btn' self._log(f"Waiting for selector={btn_selector} to render...") if not self.page.wait.ele_displayed(btn_selector, timeout=15): raise BizLogicError(message=f"Waiting for selector={btn_selector} timeout") self.mouse.human_click_ele(self.page.ele(btn_selector)) time.sleep(3) # self._log("Waiting for url redirect...") # self.page.wait.url_change('service-level', exclude=True, timeout=45) # time.sleep(2) # if "service-level" in self.page.url or "auth" in self.page.url: # raise BizLogicError(message="Redirect to appointment-booking Failed!") btn_selector = 'tag:button@text():Book your appointment' if not self.page.wait.ele_displayed(btn_selector, timeout=10): raise BizLogicError(message=f"Waiting for selector={btn_selector} timeout") self.session_create_time = time.time() self._log(f"✅ Login & Navigation Success!") except Exception as e: self._log(f"Session Create Error: {e}") if self.config.debug: self._save_screenshot("create_session_except") self.cleanup() raise e def query(self, apt_type: AppointmentType) -> VSQueryResult: res = VSQueryResult() res.success = False interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y")) target_date_obj = datetime.strptime(interest_month, "%m-%Y") target_month_text = target_date_obj.strftime("%B %Y") target_year = target_date_obj.year target_month_num = target_date_obj.month slots = [] all_slots = [] current_selected_ele = self.page.ele('@data-testid=btn-current-month-available') current_month_text = current_selected_ele.text.strip() if current_selected_ele else "" is_on_target_month = (current_month_text.lower() == target_month_text.lower()) if not is_on_target_month: self._log(f"Current is '{current_month_text}', navigating to '{target_month_text}'...") for _ in range(12): target_btn_xpath = f'xpath://a[contains(@href, "month={interest_month}")]' target_btn = self.page.ele(target_btn_xpath) if target_btn: target_btn.click(by_js=True) time.sleep(3) break next_btn = self.page.ele('@data-testid=btn-next-month-available') if next_btn: next_btn.click(by_js=True) time.sleep(2) else: self._log("Warning: Cannot find target month or 'Next Month' button.") break self._log("Extracting slots from DOM using robust data-testid features...") day_blocks_xpath = '//div[p and div//button[contains(@data-testid, "slot")]]' day_blocks = self.page.eles(f'xpath:{day_blocks_xpath}') for block in day_blocks: # 1. 提取日期:只要是这个 block 下的 p 标签,必定是 "Mon 01" 这种 p_ele = block.ele('tag:p') if not p_ele: continue # 直接从 p 标签的纯文本里抽取出数字,忽略前面的字母 day_match = re.search(r'\d+', p_ele.text) if not day_match: continue day_str = day_match.group() full_date = f"{target_year}-{target_month_num:02d}-{int(day_str):02d}" # 2. 提取可用按钮:利用 data-testid 前缀匹配 # 完美过滤掉 btn-unavailable-slot (灰色的不可用按钮) available_btns = block.eles('xpath:.//button[starts-with(@data-testid, "btn-available-slot")]') for btn in available_btns: # 提取时间:无视内部各种 span 的变动,只要 html 里有 00:00 这种格式就被截取 time_match = re.search(r'\d{2}:\d{2}', btn.html) if not time_match: continue time_str = time_match.group() # 提取 Label:完全依赖测试工程师留下的 testid test_id = btn.attr('data-testid') or "" if 'prime' in test_id and 'weekend' in test_id: lbl = 'ptaw' elif 'prime' in test_id: lbl = 'pta' else: lbl = '' all_slots.append({ 'date': full_date, 'time': time_str, 'label': lbl }) else: self._log(f"Already on '{target_month_text}'. Executing silent JS fetch...") resp = self._perform_request("GET", self.page.url, retry_count=1) self._check_page_is_session_expired_or_invalid('Book your appointment', resp.text) all_slots = self._parse_appointment_slots(resp.text) target_labels = self.free_config.get("target_labels", ["", "pta"]) slots = [s for s in all_slots if s.get("label") in target_labels] if slots: res.success = True earliest_date = slots[0]["date"] earliest_dt = datetime.strptime(earliest_date, "%Y-%m-%d") res.availability_status = AvailabilityStatus.Available res.earliest_date = earliest_dt date_map: dict[datetime, list[TimeSlot]] = {} for s in slots: date_str = s["date"] dt = datetime.strptime(date_str, "%Y-%m-%d") date_map.setdefault(dt, []).append( TimeSlot(time=s["time"], label=str(s.get("label", ""))) ) res.availability = [DateAvailability(date=d, times=slots) for d, slots in date_map.items()] self._log(f"Slot Found! -> {slots}") else: self._log("No slots available.") res.success = False res.availability_status = AvailabilityStatus.NoneAvailable return res def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult: res = VSBookResult() res.success = False exp_start = user_inputs.get('expected_start_date', '') exp_end = user_inputs.get('expected_end_date', '') support_pta = user_inputs.get('support_pta', True) target_labels = [''] if support_pta: target_labels.append('pta') available_dates_str =[ da.date.strftime("%Y-%m-%d") for da in slot_info.availability if da.date ] valid_dates_list = self._filter_dates(available_dates_str, exp_start, exp_end) if not valid_dates_list: raise NotFoundError(message="No dates match user constraints") all_possible_slots =[] for da in slot_info.availability: if not da.date: continue date_str = da.date.strftime("%Y-%m-%d") if date_str in valid_dates_list: for t in da.times: if t.label in target_labels: all_possible_slots.append({ "date": date_str, "time_obj": t, "label": t.label }) if not all_possible_slots: raise NotFoundError(message="No suitable slot found (after label filtering)") selected_slot = random.choice(all_possible_slots) selected_date = selected_slot["date"] selected_time = selected_slot["time_obj"] selected_label = selected_slot["label"] self._log(f"Found {len(all_possible_slots)} valid slots. selected slot: {selected_date} {selected_time.time} {selected_label}") js_inject_and_click = f""" try {{ const form = document.querySelector('form'); if (!form) return 'Form not found'; function setReactValue(input, value) {{ if (!input) return; input.value = value; input.dispatchEvent(new Event('input', {{ bubbles: true }})); input.dispatchEvent(new Event('change', {{ bubbles: true }})); }} setReactValue(form.querySelector('input[name="date"]'), '{selected_date}'); setReactValue(form.querySelector('input[name="time"]'), '{selected_time.time}'); setReactValue(form.querySelector('input[name="appointmentLabel"]'), '{selected_label}'); const submitBtn = form.querySelector('button[type="submit"]'); if (submitBtn) {{ submitBtn.removeAttribute('disabled'); submitBtn.classList.remove('opacity-50', 'cursor-not-allowed'); submitBtn.click(); return 'clicked'; }} else {{ return 'Submit button not found'; }} }} catch (e) {{ return e.toString(); }} """ inject_res = self.page.run_js(js_inject_and_click) self._log(f"Form submission triggered: {inject_res}") if inject_res != 'clicked': raise BizLogicError(message="Failed to inject form or click the submit button") self._log("Waiting for Next.js to process the form submission...") for _ in range(10): try: current_page_url = self.page.url current_page_html = self.page.html appointment_confirmation_indicators = [ "order-summary" in current_page_url, "partner-services" in current_page_url, "appointment-confirmation" in current_page_url, "Change my appointment" in current_page_html, "Book a new appointment" in current_page_html, ] if any(appointment_confirmation_indicators): self._log(f"✅ BOOKING SUCCESS! Redirected to: {current_page_url}") res.success = True res.label = selected_label res.book_date = selected_date res.book_time = selected_time.time self._save_screenshot("book_slot_success") break toast_selector = 'tag:div@role=alert' toast_ele = self.page.ele(toast_selector, timeout=0.5) if toast_ele: error_msg = toast_ele.text self._log(f"❌ BOOKING FAILED! Detected popup: {error_msg}") break time.sleep(0.5) except Exception: pass return res def _get_proxy_url(self): # 构造代理 proxy_url = "" if self.config.proxy.ip: s = self.config.proxy if s.username: proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}" else: proxy_url = f"{s.scheme}://{s.ip}:{s.port}" return proxy_url def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0): """ 在浏览器上下文中注入 JS 执行 Fetch """ if not self.page: raise BizLogicError("Browser not initialized") if params: from urllib.parse import urlencode if '?' in url: url += '&' + urlencode(params) else: url += '?' + urlencode(params) fetch_options = { "method": method.upper(), "headers": headers or {}, "credentials": "include" } # Body 处理 if json_data: fetch_options['body'] = json.dumps(json_data) fetch_options['headers']['Content-Type'] = 'application/json' elif data: if isinstance(data, dict): from urllib.parse import urlencode fetch_options['body'] = urlencode(data) fetch_options['headers']['Content-Type'] = 'application/x-www-form-urlencoded' else: fetch_options['body'] = data js_script = f""" const url = "{url}"; const options = {json.dumps(fetch_options)}; return fetch(url, options) .then(async response => {{ const text = await response.text(); const headers = {{}}; response.headers.forEach((value, key) => headers[key] = value); return {{ status: response.status, body: text, headers: headers, url: response.url }}; }}) .catch(error => {{ return {{ status: 0, body: error.toString(), headers: {{}}, url: url }}; }}); """ res_dict = self.page.run_js(js_script, timeout=30) resp = BrowserResponse(res_dict) if resp.status_code == 200: return resp elif resp.status_code == 401: self.is_healthy = False raise SessionExpiredOrInvalidError() elif resp.status_code == 403: if retry_count < 2: self._log(f"HTTP 403 Detected. Cloudflare session expired? Attempting refresh (Try {retry_count+1}/2)...") if self._refresh_firewall_session(): self._log("Firewall session refreshed. Retrying request...") return self._perform_request(method, url, headers, data, json_data, params, retry_count+1) else: self._log("Failed to refresh firewall session.") raise PermissionDeniedError(f"HTTP 403: {resp.text[:100]}") elif resp.status_code == 429: self.is_healthy = False raise RateLimiteddError() else: if resp.status_code == 0: raise BizLogicError(f"Network Error: {resp.text}") raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}") def _refresh_firewall_session(self) -> bool: """ 主动刷新页面以触发 Cloudflare 挑战并尝试通过 """ try: # 1. 刷新当前页面 (通常 Dashboard 页) # 这会强制浏览器重新进行 HTTP 请求,从而触发 Cloudflare 拦截页 self._log("Refreshing page to trigger Cloudflare...") self.page.refresh() # 2. 调用 CloudflareBypasser cf = CloudflareBypasser(self.page, log=self.config.debug) # 3. 尝试过盾 (尝试次数稍多一点,因为此时可能网络不稳定) success = cf.bypass(max_retry=10) if success: # 再次确认页面是否正常加载 (非 403 页面) title = self.page.title.lower() if "access denied" in title: return False # 等待 DOM 稍微稳定 time.sleep(2) return True return False except Exception as e: self._log(f"Error during firewall refresh: {e}") return False def _solve_recaptcha(self, params) -> str: """调用 VSCloudApi 解决 ReCaptcha""" key = params.get("apiToken") if not key: raise NotFoundError("Api-token required") submit_url = "https://api.capsolver.com/createTask" task = { "type": params.get("type"), "websiteURL": params.get("page"), "websiteKey": params.get("siteKey"), } if params.get("action"): task["pageAction"] = params.get("action") # if params.get("proxy"): # p = urlparse(params.get("proxy")) # task["proxyType"] = p.scheme # task["proxyAddress"] = p.hostname # task["proxyPort"] = p.port # if p.username: # task["proxyLogin"] = p.username # task["proxyPassword"] = p.password # 注意:使用 DrissionPage 后,通常是 ProxyLess 模式 # 除非你想让 Capsolver 也用同样的代理(通常不需要,除非风控极严) payload = {"clientKey": key, "task": task} import requests as req # 局部引用,避免混淆 r = req.post(submit_url, json=payload, timeout=20) if r.status_code != 200: raise BizLogicError(message="Failed to submit capsolver task") task_id = r.json().get("taskId") for _ in range(20): r = req.post("https://api.capsolver.com/getTaskResult", json={"clientKey": key, "taskId": task_id}, timeout=20) if r.status_code == 200: d = r.json() if d.get("status") == "ready": return d["solution"]["gRecaptchaResponse"] time.sleep(3) raise BizLogicError(message="Capsolver task timeout") def _parse_travel_groups(self, html_content) -> List[Dict]: groups = [] js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups' js_match = re.search(js_pattern, html_content, re.DOTALL) if js_match: json_str = js_match.group(1).replace(r'\"', '"') data = json.loads(json_str) for g in data: groups.append({ 'group_name': g.get('groupName'), 'group_number': g.get('formGroupId'), 'location': g.get('vacName') }) else: self._log('Parsed travel group page, but not found travelGroups') return groups def _parse_appointment_slots(self, html_content) -> List[Dict]: slots = [] pattern = r'"availableAppointments\\":\s*(\[.*\]),\\"showFlexiAppointment' match = re.search(pattern, html_content, re.DOTALL) if match: json_str = match.group(1).replace(r'\"', '"') data = json.loads(json_str) for day in data: d_str = day.get('day') for s in day.get('slots', []): labels = s.get('labels', []) lbl = "" # 简化逻辑:TLS label 列表 if 'pta' in labels: lbl = 'pta' elif 'ptaw' in labels: lbl = 'ptaw' elif '' in labels or not labels: lbl = '' slots.append({ 'date': d_str, 'time': s.get('time'), 'label': lbl }) return slots def _check_page_is_session_expired_or_invalid(self, keyword, html: str) -> bool: if not html: self.is_healthy = False raise SessionExpiredOrInvalidError() html_lower = html.lower() if keyword.lower() not in html_lower: session_expire_or_invalid_indicators = [ 'redirected automatically' in html_lower, 'login' in html_lower and 'password' in html_lower, 'session expired' in html_lower ] if any(session_expire_or_invalid_indicators): self.is_healthy = False raise SessionExpiredOrInvalidError() def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]: if not start_str or not end_str: return dates valid_dates = [] s_date = datetime.strptime(start_str[:10], "%Y-%m-%d") e_date = datetime.strptime(end_str[:10], "%Y-%m-%d") for date_str in dates: curr_date = datetime.strptime(date_str, "%Y-%m-%d") if s_date <= curr_date <= e_date: valid_dates.append(date_str) random.shuffle(valid_dates) return valid_dates # --- 资源清理核心方法 --- def cleanup(self): """ 销毁浏览器并彻底删除临时文件 """ # 1. 关闭浏览器 if self.page: try: self.page.quit() # 这会关闭 Chrome 进程 except Exception: pass # 忽略已关闭的错误 self.page = None # 2. 删除文件 # 注意:Chrome 关闭后可能需要几百毫秒释放文件锁,稍微等待 if os.path.exists(self.root_workspace): for _ in range(3): try: time.sleep(0.2) shutil.rmtree(self.root_workspace, ignore_errors=True) break except Exception as e: # 如果删除失败(通常是Windows文件占用),重试 self._log(f"Cleanup retry: {e}") time.sleep(0.5) # 如果依然存在,打印警告(虽然 ignore_errors=True 会掩盖报错,但可以 check exists) if os.path.exists(self.root_workspace): self._log(f"[WARN] Failed to fully remove workspace: {self.root_workspace}") # 3. [新增] 关闭代理隧道 if self.tunnel: try: self.tunnel.stop() except: pass self.tunnel = None def __del__(self): """ 析构函数:当对象被垃圾回收时自动调用 """ self.cleanup()