import time import json import random import re import os import uuid import shutil import base64 import socket import easyocr from datetime import datetime from typing import List, Dict, Optional, Any, Callable from urllib.parse import urljoin, urlparse, urlencode # DrissionPage 核心 from DrissionPage import ChromiumPage, ChromiumOptions from vs_plg import IVSPlg from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from toolkit.vs_cloud_api import VSCloudApi from toolkit.proxy_tunnel import ProxyTunnel from utils.fingerprint_utils import FingerprintGenerator class BrowserResponse: def __init__(self, result_dict): result_dict = result_dict or {} self.status_code = result_dict.get('status', 0) self.text = result_dict.get('body', '') self.headers = result_dict.get('headers', {}) self.url = result_dict.get('url', '') self._json = None def json(self): if self._json is None: if not self.text: return {} try: self._json = json.loads(self.text) except: self._json = {} return self._json def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"): dt = datetime.strptime(data_str, date_str_format) return dt.strftime("%Y-%m-%d") def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str: if "@" not in email: raise ValueError(f"Invalid email: {email}") local_part, _ = email.rsplit("@", 1) return f"{local_part}@{new_domain}" class PolPlugin(IVSPlg): """ Poland (e-konsulat) 签证预约插件 (Browser + Tunnel Mode) """ def __init__(self, group_id: str): self.group_id = group_id self.config: Optional[VSPlgConfig] = None self.free_config: Dict[str, Any] = {} self.logger = None # 浏览器实例 self.page: Optional[ChromiumPage] = None # 资源隔离 self.instance_id = uuid.uuid4().hex[:8] self.root_workspace = os.path.abspath(os.path.join("data/temp_browser_data", f"{self.group_id}.{self.instance_id}")) self.user_data_path = os.path.join(self.root_workspace, "user_data") if not os.path.exists(self.root_workspace): os.makedirs(self.root_workspace) self.reader = easyocr.Reader(['en'], gpu=False) self.tunnel = None # 代理隧道 self.is_healthy = True self.session_create_time: float = 0 def get_group_id(self) -> str: return self.group_id def set_log(self, logger: Callable[[str], None]) -> None: self.logger = logger def _log(self, message): if self.logger: self.logger(f'[PolPlugin] [{self.group_id}] {message}') else: print(f'[PolPlugin] [{self.group_id}] {message}') def set_config(self, config: VSPlgConfig): self.config = config self.free_config = config.free_config or {} def keep_alive(self): pass def health_check(self) -> bool: if not self.is_healthy: return False if not self.page: return False try: if not self.page.run_js("return 1;"): return False except: return False if self.config.session_max_life > 0: if time.time() - self.session_create_time > self.config.session_max_life: self._log("Session expired.") return False return True def create_session(self): """ 创建会话:启动浏览器 -> 代理隧道 -> 提取 Captcha -> 本地识别 -> 提交 -> 获取 Context """ self._log(f"Initializing Session (ID: {self.instance_id})...") co = ChromiumOptions() def get_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)); return s.getsockname()[1] co.set_local_port(get_free_port()) co.set_user_data_path(self.user_data_path) chrome_path = os.getenv("CHROME_BIN") if chrome_path and os.path.exists(chrome_path): co.set_paths(browser_path=chrome_path) if self.config.proxy and self.config.proxy.ip: p = self.config.proxy if p.username and p.password: self._log(f"Starting Tunnel for {p.ip}...") self.tunnel = ProxyTunnel(p.ip, p.port, p.username, p.password) local_proxy = self.tunnel.start() self._log(f"Tunnel started at {local_proxy}") co.set_argument(f'--proxy-server={local_proxy}') else: proxy_str = f"{p.scheme}://{p.ip}:{p.port}" co.set_argument(f'--proxy-server={proxy_str}') else: self._log("[WARN] No proxy configured!") fingerprint_gen = FingerprintGenerator() specific_fp = fingerprint_gen.generate(self.config.account.username) self._log(f'browser fingerprint={specific_fp}') co.headless(False) co.set_argument('--no-sandbox') # co.set_argument('--disable-gpu') co.set_argument('--disable-dev-shm-usage') co.set_argument('--window-size=1920,1080') co.set_argument('--disable-blink-features=AutomationControlled') co.set_argument('--ignore-certificate-errors') co.set_argument('--ignore-gpu-blocklist') # 忽略无显卡黑名单 co.set_argument('--enable-webgl') # 强制开启 WebGL co.set_argument('--use-gl=angle') # 使用 ANGLE 渲染后端 co.set_argument('--use-angle=swiftshader')# 强制使用 CPU 进行 3D 渲染 (这步最关键!) co.set_argument(f"--fingerprint={specific_fp.get('seed')}") co.set_argument(f"--fingerprint-platform={specific_fp.get('platform')}") co.set_argument(f"--fingerprint-brand={specific_fp.get('brand')}") try: self.page = ChromiumPage(co) url_home = "https://secure.e-konsulat.gov.pl" self._log(f"Navigating to {url_home}") self.page.get(url_home) self.page.wait.doc_loaded() self.session_create_time = time.time() self._log("Session created successfully.") except Exception as e: self._log(f"Session Create Failed: {e}") self.cleanup() raise e def query(self, apt_type: AppointmentType) -> VSQueryResult: res = VSQueryResult() res.success = False query_url = self.free_config.get('query_url') service_type = self.free_config.get('service_type') location = self.free_config.get('location') self._log(f"Navigating to {query_url}") self.page.get(query_url) captcha_image_selector = 't:img@alt=Weryfikacja obrazkowa' if not self.page.wait.ele_displayed(captcha_image_selector, timeout=30): raise BizLogicError(message=f"Wait for selector={captcha_image_selector} timeout") time.sleep(3) img_ele = self.page.ele(captcha_image_selector) img_src = img_ele.attr('src') base64_data = img_src.split(',')[1] image_bytes = base64.b64decode(base64_data) result = self.reader.readtext(image_bytes) captcha_code = result[0][-2] if result else "" self._log(f"Captcha code={captcha_code}") if not captcha_code: BizLogicError(message="Solve captcha failed") input_ele = self.page.ele('t:input@aria-label=Znaki z obrazka') input_ele.clear() input_ele.input(captcha_code) btn_selector = 'Dalej' self.page.ele(btn_selector).click(by_js=True) toast_ele = self.page.ele('tag:app-toast', timeout=2) if toast_ele: error_msg = toast_ele.text.replace('\n', ' ').strip() raise BizLogicError(message=f"Captcha verify error={error_msg}") if not self._select_mat_option('Rodzaj usługi', service_type): raise BizLogicError(message=f'Process select box failed') if not self._select_mat_option('Lokalizacja', location): raise BizLogicError(message=f'Process select box failed') if not self._select_mat_option('Chcę zarezerwować termin dla', '1 osob'): raise BizLogicError(message=f'Process select box failed') available_dates = [] self._log("Wait Query Slot...") for _ in range(20): try: no_slot_alert = self.page.ele('text:Chwilowo wszystkie udostępnione terminy', timeout=0.1) if no_slot_alert: self._log("No slots available") break listbox = self.page.ele('@role=listbox', timeout=0.1) if not listbox: termin_label = self.page.ele('tag:mat-label@@text():Termin', timeout=0.5) if termin_label: termin_select = termin_label.parent('tag:app-select-control').ele('tag:mat-select') if termin_select and 'mat-select-disabled' not in str(termin_select.attr('class')): try: termin_select.click() except: termin_select.click(by_js=True) time.sleep(0.5) listbox = self.page.ele('@role=listbox', timeout=1) if listbox: option_elements = listbox.eles('.mat-option-text') for ele in option_elements: date_str = ele.text.strip() if date_str: available_dates.append(date_str) if available_dates: self._log(f"✅ Success extracted dates: {available_dates}") break except Exception as e: self._log(f"Query loop exception: {e}") time.sleep(0.5) if available_dates: selected_date = random.choice(available_dates) self._log(f"🎲 Random select date: {selected_date}...") locked = self._lock_slot(selected_date) if locked: session_id = self._save_browser_session() wechat_message = f"🎉 [Poland] Slot locked\n📍 location: {location}\n📅 date: {selected_date}\n🔑 SessionId: {session_id}" VSCloudApi.Instance().push_weixin_text(wechat_message) res.success = True res.availability_status = AvailabilityStatus.Available earliest_date = available_dates[0] earliest_dt = datetime.strptime(earliest_date, "%Y-%m-%d") res.earliest_date = earliest_dt res.availability = [ DateAvailability( date=datetime.strptime(d, "%Y-%m-%d"), times=[], ) for d in available_dates ] else: res.success = False res.availability_status = AvailabilityStatus.NoneAvailable res.availability = [] return res def _lock_slot(self, lock_date): slot_selector = f'xpath://span[contains(@class, "mat-option-text") and contains(text(), "{lock_date}")]' slot_ele = self.page.ele(slot_selector, timeout=1) if not slot_ele: termin_label = self.page.ele('tag:mat-label@@text():Termin', timeout=1) if termin_label: termin_select = termin_label.parent('tag:app-select-control').ele('tag:mat-select') if termin_select and 'mat-select-disabled' not in str(termin_select.attr('class')): try: termin_select.click() except: termin_select.click(by_js=True) time.sleep(0.5) slot_ele = self.page.ele(slot_selector, timeout=3) if not slot_ele: self._log(f"❌ Can't find date {lock_date} to click.") return False try: slot_ele.click() except: slot_ele.click(by_js=True) self._log(f"✅ Clicked date: {lock_date}") time.sleep(1) btn_selector = 'xpath://button[.//span[contains(text(), "Dalej")]]' next_btn = self.page.ele(btn_selector, timeout=3) if not next_btn: self._log("❌ Can't find 'Dalej' button") return False try: next_btn.click() except: next_btn.click(by_js=True) self._log("✅ Clicked Dalej, locking slot...") return self.page.wait.url_change('weryfikacja-obrazkowa', exclude=True, timeout=15) def _select_mat_option(self, label_text, option_text): self._log(f"choose: {label_text} -> {option_text}") label = self.page.ele(f'tag:mat-label@@text():{label_text}', timeout=5) if not label: self._log(f"Can't find label: {label_text}") return False container = label.parent('tag:app-select-control') select_box = container.ele('tag:mat-select') if not select_box: self._log("Can't find select box") return False select_box.click(by_js=True) time.sleep(0.5) option = self.page.ele(f'tag:mat-option@@text():{option_text}', timeout=3) if option: option.click(by_js=True) time.sleep(0.5) return True else: self._log(f"Can't find option: {option_text}") return False def book(self, slot_info: VSQueryResult, user_inputs: Dict) -> VSBookResult: res = VSBookResult() return res def _save_browser_session(self): self._log("Abstract browser session env...") cookies_dict = self.page.cookies(all_domains=True, all_info=True) cookies_str = cookies_dict.as_json() local_storage_str = self.page.run_js('return JSON.stringify(window.localStorage) || "{}"') session_storage_str = self.page.run_js('return JSON.stringify(window.sessionStorage) || "{}"') proxy_str = "" if hasattr(self, 'config') and hasattr(self.config, 'proxy') and self.config.proxy.ip: p = self.config.proxy if p.username and p.password: proxy_str = f"{p.scheme}://{p.username}:{p.password}@{p.ip}:{p.port}" else: proxy_str = f"{p.scheme}://{p.ip}:{p.port}" session_data = VSCloudApi.Instance().create_http_session( session_id=str(uuid.uuid4().hex), cookies=cookies_str, local_storage=local_storage_str, session_storage=session_storage_str, user_agent=self.page.user_agent, page=self.page.url, proxy=proxy_str ) return session_data.get('session_id') def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0): if not self.page: raise BizLogicError("Browser not init") req_url = url if params: sep = '&' if '?' in req_url else '?' req_url += sep + urlencode(params) fetch_opts = { "method": method.upper(), "headers": headers or {}, "credentials": "include" } if json_data: fetch_opts['body'] = json.dumps(json_data) fetch_opts['headers']['Content-Type'] = 'application/json' elif data: if isinstance(data, dict): fetch_opts['body'] = urlencode(data) fetch_opts['headers']['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' else: fetch_opts['body'] = data js = f""" return fetch("{req_url}", {json.dumps(fetch_opts)}) .then(async r => {{ const h = {{}}; r.headers.forEach((v, k) => h[k] = v); return {{ status: r.status, body: await r.text(), headers: h, url: r.url }}; }}).catch(e => {{ return {{ status: 0, body: e.toString() }}; }}); """ resp = BrowserResponse(self.page.run_js(js, timeout=60)) if resp.status_code == 200: return resp elif resp.status_code == 403: if "Just a moment" in resp.text and retry_count < 2: self._log("Cloudflare 403. Refreshing...") if self._refresh_firewall_session(): return self._perform_request(method, url, headers, data, json_data, params, retry_count+1) raise PermissionDeniedError(f"HTTP 403: {resp.text[:100]}") elif resp.status_code == 429: self.is_healthy = False raise RateLimiteddError() elif resp.status_code in [401, 419]: self.is_healthy = False raise SessionExpiredOrInvalidError() else: raise BizLogicError(f"HTTP {resp.status_code}: {resp.text[:100]}") def _filter_dates(self, dates, start, end): if not start or not end: return dates valid = [] s = datetime.strptime(start[:10], "%Y-%m-%d") e = datetime.strptime(end[:10], "%Y-%m-%d") for d in dates: c = datetime.strptime(d, "%Y-%m-%d") if s <= c <= e: valid.append(d) random.shuffle(valid) return valid def cleanup(self): if self.page: try: self.page.quit() except: pass self.page = None if os.path.exists(self.root_workspace): for _ in range(3): try: time.sleep(0.2); shutil.rmtree(self.root_workspace, ignore_errors=True); break except: time.sleep(0.5) if self.tunnel: try: self.tunnel.stop() except: pass self.tunnel = None def __del__(self): self.cleanup()