| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- import time
- import json
- import random
- import re
- import os
- import uuid
- import shutil
- import base64
- import socket
- import easyocr
- from datetime import datetime
- from typing import List, Dict, Optional, Any, Callable
- from urllib.parse import urljoin, urlparse, urlencode
- # DrissionPage 核心
- from DrissionPage import ChromiumPage, ChromiumOptions
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from toolkit.vs_cloud_api import VSCloudApi
- from toolkit.proxy_tunnel import ProxyTunnel
- class BrowserResponse:
- def __init__(self, result_dict):
- result_dict = result_dict or {}
- self.status_code = result_dict.get('status', 0)
- self.text = result_dict.get('body', '')
- self.headers = result_dict.get('headers', {})
- self.url = result_dict.get('url', '')
- self._json = None
- def json(self):
- if self._json is None:
- if not self.text: return {}
- try: self._json = json.loads(self.text)
- except: self._json = {}
- return self._json
- def to_yyyymmdd(data_str: str, date_str_format: str, target_format: str="%Y-%m-%d"):
- dt = datetime.strptime(data_str, date_str_format)
- return dt.strftime("%Y-%m-%d")
- def get_alias_email(email: str, new_domain: str = "gmail-app.com") -> str:
- if "@" not in email: raise ValueError(f"Invalid email: {email}")
- local_part, _ = email.rsplit("@", 1)
- return f"{local_part}@{new_domain}"
- class PolPlugin(IVSPlg):
- """
- Poland (e-konsulat) 签证预约插件 (Browser + Tunnel Mode)
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.logger = None
-
- # 浏览器实例
- self.page: Optional[ChromiumPage] = None
-
- # 资源隔离
- self.instance_id = uuid.uuid4().hex[:8]
- self.root_workspace = os.path.abspath(os.path.join("data/temp_browser_data", f"{self.group_id}.{self.instance_id}"))
- self.user_data_path = os.path.join(self.root_workspace, "user_data")
-
- if not os.path.exists(self.root_workspace):
- os.makedirs(self.root_workspace)
-
- self.reader = easyocr.Reader(['en'], gpu=False)
-
- self.tunnel = None # 代理隧道
- self.is_healthy = True
- self.session_create_time: float = 0
- def get_group_id(self) -> str:
- return self.group_id
-
- def set_log(self, logger: Callable[[str], None]) -> None:
- self.logger = logger
-
- def _log(self, message):
- if self.logger:
- self.logger(f'[PolPlugin] [{self.group_id}] {message}')
- else:
- print(f'[PolPlugin] [{self.group_id}] {message}')
-
- def set_config(self, config: VSPlgConfig):
- self.config = config
- self.free_config = config.free_config or {}
-
- def keep_alive(self):
- pass
- def health_check(self) -> bool:
- if not self.is_healthy:
- return False
- if not self.page:
- return False
- try:
- if not self.page.run_js("return 1;"):
- return False
- except:
- return False
- if self.config.session_max_life > 0:
- if time.time() - self.session_create_time > self.config.session_max_life:
- self._log("Session expired.")
- return False
- return True
- def create_session(self):
- """
- 创建会话:启动浏览器 -> 代理隧道 -> 提取 Captcha -> 本地识别 -> 提交 -> 获取 Context
- """
- self._log(f"Initializing Session (ID: {self.instance_id})...")
- co = ChromiumOptions()
- def get_free_port():
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('', 0)); return s.getsockname()[1]
- co.set_local_port(get_free_port())
-
- co.set_user_data_path(self.user_data_path)
- chrome_path = os.getenv("CHROME_BIN")
- if chrome_path and os.path.exists(chrome_path):
- co.set_paths(browser_path=chrome_path)
-
- if self.config.proxy and self.config.proxy.ip:
- p = self.config.proxy
- if p.username and p.password:
- self._log(f"Starting Tunnel for {p.ip}...")
- self.tunnel = ProxyTunnel(p.ip, p.port, p.username, p.password)
- local_proxy = self.tunnel.start()
- self._log(f"Tunnel started at {local_proxy}")
- co.set_argument(f'--proxy-server={local_proxy}')
- else:
- proxy_str = f"{p.proto}://{p.ip}:{p.port}"
- co.set_argument(f'--proxy-server={proxy_str}')
- else:
- self._log("[WARN] No proxy configured!")
-
- co.headless(False)
- co.set_argument('--no-sandbox')
- co.set_argument('--disable-gpu')
- co.set_argument('--disable-dev-shm-usage')
- co.set_argument('--window-size=1920,1080')
- co.set_argument('--disable-blink-features=AutomationControlled')
- try:
- self.page = ChromiumPage(co)
- url_home = "https://secure.e-konsulat.gov.pl"
- self._log(f"Navigating to {url_home}")
- self.page.get(url_home)
- self.page.wait.doc_loaded()
- self.session_create_time = time.time()
- self._log("Session created successfully.")
- except Exception as e:
- self._log(f"Session Create Failed: {e}")
- self.cleanup()
- raise e
- def query(self, apt_type: AppointmentType) -> VSQueryResult:
- res = VSQueryResult()
- res.success = False
- query_url = self.free_config.get('query_url')
- service_type = self.free_config.get('service_type')
- location = self.free_config.get('location')
- self._log(f"Navigating to {query_url}")
- self.page.get(query_url)
- captcha_image_selector = 't:img@alt=Weryfikacja obrazkowa'
- if not self.page.wait.ele_displayed(captcha_image_selector, timeout=30):
- raise BizLogicError(message=f"Wait for selector={captcha_image_selector} timeout")
- time.sleep(3)
- img_ele = self.page.ele(captcha_image_selector)
- img_src = img_ele.attr('src')
- base64_data = img_src.split(',')[1]
- image_bytes = base64.b64decode(base64_data)
- result = self.reader.readtext(image_bytes)
- captcha_code = result[0][-2] if result else ""
- self._log(f"Captcha code={captcha_code}")
- if not captcha_code:
- BizLogicError(message="Solve captcha failed")
- input_ele = self.page.ele('t:input@aria-label=Znaki z obrazka')
- input_ele.clear()
- input_ele.input(captcha_code)
- btn_selector = 'Dalej'
- self.page.ele(btn_selector).click(by_js=True)
-
- toast_ele = self.page.ele('tag:app-toast', timeout=2)
- if toast_ele:
- error_msg = toast_ele.text.replace('\n', ' ').strip()
- raise BizLogicError(message=f"Captcha verify error={error_msg}")
- if not self._select_mat_option('Rodzaj usługi', service_type):
- raise BizLogicError(message=f'Process select box failed')
- if not self._select_mat_option('Lokalizacja', location):
- raise BizLogicError(message=f'Process select box failed')
- if not self._select_mat_option('Chcę zarezerwować termin dla', '1 osob'):
- raise BizLogicError(message=f'Process select box failed')
-
- available_dates = []
- self._log("Wait Query Slot...")
- for _ in range(20):
- try:
- no_slot_alert = self.page.ele('text:Chwilowo wszystkie udostępnione terminy', timeout=0.1)
- if no_slot_alert:
- self._log("No slots available")
- break
- listbox = self.page.ele('@role=listbox', timeout=0.1)
- if not listbox:
- termin_label = self.page.ele('tag:mat-label@@text():Termin', timeout=0.5)
- if termin_label:
- termin_select = termin_label.parent('tag:app-select-control').ele('tag:mat-select')
- if termin_select and 'mat-select-disabled' not in str(termin_select.attr('class')):
- try:
- termin_select.click()
- except:
- termin_select.click(by_js=True)
-
- time.sleep(0.5)
- listbox = self.page.ele('@role=listbox', timeout=1)
- if listbox:
- option_elements = listbox.eles('.mat-option-text')
- for ele in option_elements:
- date_str = ele.text.strip()
- if date_str:
- available_dates.append(date_str)
- if available_dates:
- self._log(f"✅ Success extracted dates: {available_dates}")
- break
- except Exception as e:
- self._log(f"Query loop exception: {e}")
- time.sleep(0.5)
- if available_dates:
- selected_date = random.choice(available_dates)
- self._log(f"🎲 Random select date: {selected_date}...")
- locked = self._lock_slot(selected_date)
- if locked:
- session_id = self._save_browser_session()
- wechat_message = f"🎉 [Poland] Slot locked\n📍 location: {location}\n📅 date: {selected_date}\n🔑 SessionId: {session_id}"
- VSCloudApi.Instance().push_weixin_text(wechat_message)
- res.success = True
- res.availability_status = AvailabilityStatus.Available
- earliest_date = available_dates[0]
- earliest_dt = datetime.strptime(earliest_date, "%Y-%m-%d")
- res.earliest_date = earliest_dt
- res.availability = [
- DateAvailability(
- date=datetime.strptime(d, "%Y-%m-%d"),
- times=[],
- )
- for d in available_dates
- ]
- else:
- res.success = False
- res.availability_status = AvailabilityStatus.NoneAvailable
- res.availability = []
- return res
-
- def _lock_slot(self, lock_date):
- slot_selector = f'xpath://span[contains(@class, "mat-option-text") and contains(text(), "{lock_date}")]'
- slot_ele = self.page.ele(slot_selector, timeout=1)
- if not slot_ele:
- termin_label = self.page.ele('tag:mat-label@@text():Termin', timeout=1)
- if termin_label:
- termin_select = termin_label.parent('tag:app-select-control').ele('tag:mat-select')
- if termin_select and 'mat-select-disabled' not in str(termin_select.attr('class')):
- try:
- termin_select.click()
- except:
- termin_select.click(by_js=True)
- time.sleep(0.5)
- slot_ele = self.page.ele(slot_selector, timeout=3)
- if not slot_ele:
- self._log(f"❌ Can't find date {lock_date} to click.")
- return False
- try:
- slot_ele.click()
- except:
- slot_ele.click(by_js=True)
- self._log(f"✅ Clicked date: {lock_date}")
- time.sleep(1)
- btn_selector = 'xpath://button[.//span[contains(text(), "Dalej")]]'
- next_btn = self.page.ele(btn_selector, timeout=3)
- if not next_btn:
- self._log("❌ Can't find 'Dalej' button")
- return False
- try:
- next_btn.click()
- except:
- next_btn.click(by_js=True)
- self._log("✅ Clicked Dalej, locking slot...")
- return self.page.wait.url_change('weryfikacja-obrazkowa', exclude=True, timeout=15)
-
- def _select_mat_option(self, label_text, option_text):
- self._log(f"choose: {label_text} -> {option_text}")
- label = self.page.ele(f'tag:mat-label@@text():{label_text}', timeout=5)
- if not label:
- self._log(f"Can't find label: {label_text}")
- return False
-
- container = label.parent('tag:app-select-control')
- select_box = container.ele('tag:mat-select')
-
- if not select_box:
- self._log("Can't find select box")
- return False
- select_box.click(by_js=True)
- time.sleep(0.5)
-
- option = self.page.ele(f'tag:mat-option@@text():{option_text}', timeout=3)
- if option:
- option.click(by_js=True)
- time.sleep(0.5)
- return True
- else:
- self._log(f"Can't find option: {option_text}")
- return False
- def book(self, slot_info: VSQueryResult, user_inputs: Dict) -> VSBookResult:
- res = VSBookResult()
- return res
-
- def _save_browser_session(self):
- self._log("Abstract browser session env...")
- cookies_dict = self.page.cookies(all_domains=True, all_info=True)
- cookies_str = cookies_dict.as_json()
- local_storage_str = self.page.run_js('return JSON.stringify(window.localStorage) || "{}"')
- session_storage_str = self.page.run_js('return JSON.stringify(window.sessionStorage) || "{}"')
- proxy_str = ""
- if hasattr(self, 'config') and hasattr(self.config, 'proxy') and self.config.proxy.ip:
- p = self.config.proxy
- if p.username and p.password:
- proxy_str = f"{p.proto}://{p.username}:{p.password}@{p.ip}:{p.port}"
- else:
- proxy_str = f"{p.proto}://{p.ip}:{p.port}"
- session_data = VSCloudApi.Instance().create_http_session(
- session_id=str(uuid.uuid4().hex),
- cookies=cookies_str,
- local_storage=local_storage_str,
- session_storage=session_storage_str,
- user_agent=self.page.user_agent,
- page=self.page.url,
- proxy=proxy_str
- )
- return session_data.get('session_id')
-
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0):
- if not self.page:
- raise BizLogicError("Browser not init")
-
- req_url = url
- if params:
- sep = '&' if '?' in req_url else '?'
- req_url += sep + urlencode(params)
-
- fetch_opts = { "method": method.upper(), "headers": headers or {}, "credentials": "include" }
-
- if json_data:
- fetch_opts['body'] = json.dumps(json_data)
- fetch_opts['headers']['Content-Type'] = 'application/json'
- elif data:
- if isinstance(data, dict):
- fetch_opts['body'] = urlencode(data)
- fetch_opts['headers']['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
- else:
- fetch_opts['body'] = data
- js = f"""
- return fetch("{req_url}", {json.dumps(fetch_opts)})
- .then(async r => {{
- const h = {{}}; r.headers.forEach((v, k) => h[k] = v);
- return {{ status: r.status, body: await r.text(), headers: h, url: r.url }};
- }}).catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
- """
-
- resp = BrowserResponse(self.page.run_js(js, timeout=60))
-
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 403:
- if "Just a moment" in resp.text and retry_count < 2:
- self._log("Cloudflare 403. Refreshing...")
- if self._refresh_firewall_session():
- return self._perform_request(method, url, headers, data, json_data, params, retry_count+1)
- raise PermissionDeniedError(f"HTTP 403: {resp.text[:100]}")
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError()
- elif resp.status_code in [401, 419]:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- else:
- raise BizLogicError(f"HTTP {resp.status_code}: {resp.text[:100]}")
- def _filter_dates(self, dates, start, end):
- if not start or not end: return dates
- valid = []
- s = datetime.strptime(start[:10], "%Y-%m-%d")
- e = datetime.strptime(end[:10], "%Y-%m-%d")
- for d in dates:
- c = datetime.strptime(d, "%Y-%m-%d")
- if s <= c <= e: valid.append(d)
- random.shuffle(valid)
- return valid
- def cleanup(self):
- if self.page:
- try: self.page.quit()
- except: pass
- self.page = None
- if os.path.exists(self.root_workspace):
- for _ in range(3):
- try: time.sleep(0.2); shutil.rmtree(self.root_workspace, ignore_errors=True); break
- except: time.sleep(0.5)
- if self.tunnel:
- try: self.tunnel.stop()
- except: pass
- self.tunnel = None
-
- def __del__(self):
- self.cleanup()
|