| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623 |
- import time
- import json
- import random
- import re
- import os
- from datetime import datetime
- from typing import List, Dict, Optional, Any
- from urllib.parse import urljoin, urlparse
- # 第三方库
- try:
- from curl_cffi import requests, const
- from bs4 import BeautifulSoup
- except ImportError:
- raise ImportError("Missing dependencies. Run: pip install curl-cffi beautifulsoup4")
- # 框架依赖
- from vs_plg import IVSPlg, VSError # type: ignore
- from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, QueryWaitMode # type: ignore
- from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_DEBUG, VSC_WARN # type: ignore
- from toolkit.vs_cloud_api import VSCloudApi # type: ignore
- class TlsPlugin(IVSPlg):
- """
- TLS 签证预约插件
- 适配法国签证 (FR) 流程
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
-
- # 会话相关
- self.session: Optional[requests.Session] = None
- self.travel_group: Optional[Dict] = None
- self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"
-
- # 状态
- self.last_error = VSError(0, "OK")
- self.is_healthy = True
-
- def _save_debug_html(self, content: str, prefix: str = "debug"):
- """
- 辅助方法:将页面 HTML 保存到本地 debug_pages 目录
- """
- try:
- # 确保目录存在
- save_dir = "debug_pages"
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
-
- # 生成文件名: prefix_GroupID_时间戳.html
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{save_dir}/{prefix}_{self.group_id}_{timestamp}.html"
-
- with open(filename, "w", encoding="utf-8") as f:
- f.write(content)
-
- VSC_INFO("tls_plg", "[%s] HTML saved to: %s", self.group_id, filename)
- except Exception as e:
- VSC_WARN("tls_plg", "[%s] Failed to save debug HTML: %s", self.group_id, str(e))
- def get_group_id(self) -> str:
- return self.group_id
- def set_config(self, config: VSPlgConfig):
- self.config = config
- try:
- self.free_config = json.loads(config.free_config) if config.free_config else {}
- except:
- self.free_config = {}
- def health_check(self) -> bool:
- return self.is_healthy
- def get_last_error(self) -> VSError:
- return self.last_error
- def _set_error(self, code: int, message: str):
- self.last_error = VSError(code, message)
- VSC_ERROR("tls_plg", "[%s] Error %d: %s", self.group_id, code, message)
- if code in [2003, 2000, 2001]: # 会话无效或登录失败
- self.is_healthy = False
- # ---------------------------------------------------------
- # 核心接口实现
- # ---------------------------------------------------------
- def create_session(self) -> bool:
- """
- 创建会话:处理 Cloudflare -> 登录 -> 获取 Travel Group
- """
- VSC_INFO("tls_plg", "[%s] Creating session...", self.group_id)
- self.is_healthy = True
-
- # 1. 初始化 Session
- curlopt = {
- const.CurlOpt.MAXAGE_CONN: 1800,
- const.CurlOpt.MAXLIFETIME_CONN: 1800,
- const.CurlOpt.VERBOSE: False, # 生产环境建议关闭
- }
-
- # 构造代理
- proxy_url = ""
- if self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- proxy_url = f"{s.scheme}://{s.ip}:{s.port}"
- self.session = requests.Session(
- proxy=proxy_url,
- impersonate="chrome124",
- curl_options=curlopt,
- use_thread_local_curl=False,
- http_version=const.CurlHttpVersion.V2TLS
- )
- embassy = self._get_embassy_config()
- if not embassy:
- return False
- # 2. 解决 Cloudflare 5s 盾
- if not self._solve_cloudflare5S_challenge(embassy, proxy_url):
- self._set_error(1001, "Cloudflare challenge failed")
- return False
- # 3. 获取登录页面参数 (OIDC)
- login_page = "https://visas-fr.tlscontact.com/en-us/login"
- params = {
- "issuerId": embassy["code"],
- "country": embassy["country"],
- "vac": embassy["code"],
- "redirect": f"/en-us/country/{embassy['country']}/vac/{embassy['code']}"
- }
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Referer': f'https://visas-fr.tlscontact.com/en-us/country/{embassy["country"]}/vac/{embassy["code"]}',
- 'User-Agent': self.user_agent,
- }
- try:
- resp = self.session.get(login_page, headers=headers, params=params)
- if resp.status_code != 200:
- self._set_error(resp.status_code, f"Get Login Page Failed: {resp.status_code}")
- return False
-
- # 解析 Keycloak 登录地址
- soup = BeautifulSoup(resp.text, 'html.parser')
- form = soup.find('form')
- if not form:
- self._set_error(2005, "Login form not found")
- return False
- action = form.get('action')
- authenticate_url = action if action.startswith('http') else urljoin(resp.url, action)
- except Exception as e:
- self._set_error(1099, f"Network error during login init: {e}")
- return False
- # 4. 解决 ReCaptcha V2 (登录验证码)
- # 注意:这里需要 API Token,从配置获取
- api_token = self.free_config.get("capsolver_key", "")
- if not api_token:
- VSC_WARN("tls_plg", "Missing 'capsolver_key' in free_config, captcha might fail.")
-
- rc_params = {
- "type": "ReCaptchaV2TaskProxyLess", # 或 ReCaptchaV2Task 配合 proxy
- "page": resp.url,
- "siteKey": "6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0",
- "apiToken": api_token,
- "proxy": proxy_url
- }
- g_token = self._solve_recaptcha(rc_params)
- if not g_token:
- self._set_error(1001, "Failed to solve Login Recaptcha")
- return False
- # 5. 提交登录
- payload = {
- 'username': self.config.account.username,
- 'password': self.config.account.password,
- 'g-recaptcha-response': g_token
- }
- headers['Content-Type'] = 'application/x-www-form-urlencoded'
-
- try:
- resp = self.session.post(authenticate_url, headers=headers, data=payload)
- if resp.status_code != 200:
- self._set_error(resp.status_code, f"Login Submit Failed: {resp.status_code}")
- return False
-
- # 6. 解析 Travel Groups
- groups = self._parse_travel_groups(resp.text)
- if not groups:
- # 检查是否包含错误信息
- if "Invalid username or password" in resp.text:
- self._set_error(2000, "Invalid username or password")
- else:
- self._set_error(2005, "No Travel Groups found after login")
- return False
-
- # 选择匹配城市的 Group
- target_city = embassy['city'].lower()
- for g in groups:
- if g['location'].lower() == target_city:
- self.travel_group = g
- break
-
- if not self.travel_group:
- self._set_error(2005, f"No group found for city {target_city}")
- return False
- VSC_INFO("tls_plg", "[%s] Session created. Group: %s", self.group_id, self.travel_group['group_number'])
- return True
- except Exception as e:
- self._set_error(1099, f"Login exception: {e}")
- return False
- def query(self) -> VSQueryResult:
- res = VSQueryResult()
- if not self.session or not self.travel_group:
- self._set_error(2003, "Session invalid, please login first")
- return res
- embassy = self._get_embassy_config()
- group_num = self.travel_group['group_number']
- interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y"))
- url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking'
- params = {
- 'location': embassy["code"],
- 'month': interest_month,
- }
- headers = {
- 'accept': '*/*',
- 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'referer': f'{url}?location={embassy["code"]}',
- 'user-agent': self.user_agent,
- }
- try:
- resp = self.session.get(url, params=params, headers=headers)
-
- # 1. 检查 Cloudflare 403 (硬性拦截)
- if resp.status_code == 403:
- VSC_WARN("tls_plg", "[%s] Query 403 Forbidden. Solving Cloudflare...", self.group_id)
- if self._solve_cloudflare5S_challenge(embassy):
- resp = self.session.get(url, params=params, headers=headers)
- else:
- self._set_error(2006, "Cloudflare re-challenge failed")
- return res
- # 2. 智能检查 Session Expired
- # 逻辑修正:即使是 401,如果内容包含 valid data,也视为成功 (绕过某些WAF误报)
- is_valid_content = "availableAppointments" in resp.text
-
- if not is_valid_content:
- if resp.status_code == 401 or self._is_session_expired_page(resp.text):
- VSC_WARN("tls_plg", "[%s] Session expired. URL: %s", self.group_id, resp.url)
- self._save_debug_html(resp.text, "query_session_expired")
- self._set_error(2003, "Session expired")
- self.is_healthy = False
- return res
-
- # 其他非 200 且无内容的错误
- if resp.status_code != 200:
- self._set_error(resp.status_code, f"Query failed status: {resp.status_code}")
- return res
- # 3. 解析 Slots
- all_slots = self._parse_appointment_slots(resp.text)
-
- # 过滤 Label
- target_labels = self.free_config.get("target_labels", ["", "pta"])
- available = []
-
- for slot in all_slots:
- if slot.get('label') in target_labels:
- available.append(slot)
-
- res.success = True
- res.city = embassy['city']
- res.visa_type = "Tourist"
- res.availability_status = AvailabilityStatus.NoneAvailable
-
- if available:
- res.availability_status = AvailabilityStatus.Available
- res.earliest_date = available[0]['date']
-
- date_map = {}
- for s in available:
- d = s['date']
- if d not in date_map: date_map[d] = []
- ts = VSQueryResult.DateAvailability.TimeSlot()
- ts.time = s['time']
- ts.label = f"{s['type']}"
- date_map[d].append(ts)
-
- for d, slots in date_map.items():
- da = VSQueryResult.DateAvailability()
- da.date = d
- da.times = slots
- res.availability.append(da)
-
- VSC_INFO("tls_plg", "[%s] Found %d slots", self.group_id, len(available))
- else:
- VSC_DEBUG("tls_plg", "[%s] Query OK, but no matching slots.", self.group_id)
- except Exception as e:
- self._set_error(1099, f"Query exception: {e}")
- return res
- def book(self, slot_info: VSQueryResult) -> VSBookResult:
- """
- 预约 (实现 Multipart Form 提交)
- 注意:传入的 slot_info 是 query 的结果,我们需要从中选一个具体的 slot。
- 这里假设 slot_info.availability[0].times[0] 是我们要订的。
- """
- res = VSBookResult()
- if not self.session or not self.travel_group:
- self._set_error(2003, "Session invalid")
- return res
-
- # 简单策略:选第一个可用时间
- if not slot_info.availability or not slot_info.availability[0].times:
- self._set_error(3002, "No slots in slot_info to book")
- return res
-
- target_date = slot_info.availability[0].date
- target_time = slot_info.availability[0].times[0].time
- # 从 label 解析回原始 label string 比较困难,这里简化处理,
- # 实际应在 QueryResult 中携带原始数据,或重新匹配
- # 这里为了演示,假设 label 为空 (Standard)
- target_label = ""
-
- embassy = self._get_embassy_config()
- group_num = self.travel_group['group_number']
- interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y"))
-
- # 1. 解决 ReCaptcha V3
- page_url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking?location={embassy["code"]}&month={interest_month}'
- proxy_url = self.session.proxies.get("http") if self.session.proxies else ""
- api_token = self.free_config.get("capsolver_key", "")
-
- rc_params = {
- "type": "ReCaptchaV3Task",
- "page": page_url,
- "action": "book",
- "siteKey": "6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9",
- "apiToken": api_token,
- "proxy": proxy_url
- }
- g_token = self._solve_recaptcha(rc_params)
- if not g_token:
- self._set_error(1001, "Failed to solve Booking Recaptcha")
- return res
- # 2. 构造请求
- url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking'
-
- # 复杂的 Header
- next_action = '601f284bf7ee33b6578ad0fad426fae18c232707f2' # 此值可能会变,需关注
- next_state = '%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22$GROUPID$%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D'
-
- headers = {
- 'Next-Action': next_action,
- 'Referer': page_url,
- 'Next-Router-State-Tree': next_state.replace("$GROUPID$", group_num),
- 'Accept': 'text/x-component',
- 'User-Agent': self.user_agent,
- }
- params = {
- 'location': embassy["code"],
- 'month': interest_month,
- }
-
- # 3. 构造 Multipart Form Data
- boundary = "----WebKitFormBoundary" + "".join(
- random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", k=16)
- )
- headers["Content-Type"] = f"multipart/form-data; boundary={boundary}"
-
- form_fields = {
- '1_formGroupId': str(group_num),
- '1_lang': 'en-us',
- '1_process': 'APPOINTMENT',
- '1_location': embassy["code"],
- '1_date': target_date,
- '1_time': target_time,
- '1_appointmentLabel': target_label,
- '1_captcha_token': g_token,
- '0': '[{"status":"IDLE"},"$K1"]'
- }
-
- body_parts = []
- for name, value in form_fields.items():
- body_parts.append(f"--{boundary}\r\n")
- body_parts.append(f'Content-Disposition: form-data; name="{name}"\r\n')
- body_parts.append("\r\n")
- body_parts.append(f"{value}\r\n")
- body_parts.append(f"--{boundary}--\r\n")
- body = "".join(body_parts).encode("utf-8")
-
- try:
- resp = self.session.post(url, params=params, headers=headers, data=body)
- if resp.status_code == 303: # TLS 成功通常重定向
- res.success = True
- res.order_id = f"TLS-{int(time.time())}"
- res.book_date = target_date
- res.book_time = target_time
- VSC_INFO("tls_plg", "[%s] Book Success (303 Redirect)!", self.group_id)
- return res
- else:
- self._set_error(resp.status_code, f"Book Failed: {resp.status_code} {resp.text[:100]}")
- except Exception as e:
- self._set_error(1099, f"Book exception: {e}")
- return res
- # ---------------------------------------------------------
- # 辅助功能
- # ---------------------------------------------------------
- def _get_embassy_config(self) -> Dict:
- # 从 free_config 提取 embassy 信息,格式需与 TLS_EMBASSY 结构一致
- # 示例 JSON: { "embassy": { "code": "gbLON2fr", "country": "gb", "mission": "fr", "city": "london" } }
- # 或者平铺在 free_config
- if "embassy_code" in self.free_config:
- return {
- "code": self.free_config.get("embassy_code"),
- "country": self.free_config.get("country_code"),
- "mission": self.free_config.get("mission_code", "fr"),
- "city": self.free_config.get("city")
- }
- return {} # 失败
- def _solve_cloudflare5S_challenge(self, embassy, proxy_url) -> bool:
- """
- 解决 Cloudflare 5s 盾
- 使用 VSCloudApi 的 submit_anticloudflare_task
- """
- VSC_INFO("tls_plg", "[%s] Solving Cloudflare 5s...", self.group_id)
- website_url = f'https://visas-fr.tlscontact.com/en-us/country/{embassy["country"]}'
-
- # 1. 格式化代理字符串
- # 这里的接口要求格式通常是: host:port:user:pass (根据你的脚本示例)
- # self.config.proxy 结构体里的数据
- p = self.config.proxy
- if not p.ip:
- VSC_ERROR("tls_plg", "Proxy is required for Cloudflare challenge")
- return False
-
- # 构造 user:pass@ip:port 用于 urlparse (方便解析) 或者直接拼接
- # 你的独立脚本中是: f'{parsed_proxy.hostname}:{parsed_proxy.port}:{parsed_proxy.username}:{parsed_proxy.password}'
- # VSPlgConfig 中的 proxy 对象字段: ip, port, username, password
-
- if p.username:
- proxy_str = f"{p.ip}:{p.port}:{p.username}:{p.password}"
- else:
- proxy_str = f"{p.ip}:{p.port}"
-
- # 2. 提交任务
- task = VSCloudApi.Instance().submit_anticloudflare_task(proxy_str, website_url)
- if not task or not task.get('id'):
- VSC_ERROR("tls_plg", "[%s] Failed to submit AntiCloudflareTask", self.group_id)
- return False
-
- # 3. 等待结果 (VSCloudApi.get_anticloudflare_result 内部已包含轮询)
- task_id = str(task['id'])
- result = VSCloudApi.Instance().get_anticloudflare_result(task_id)
-
- if result:
- try:
- # 4. 解析结果并设置 Session
- # result['result'] 是一个 JSON 字符串,包含 cookies 和 userAgent
- parsed_result = json.loads(result.get('result', '{}'))
-
- cookies_list = parsed_result.get('cookies', [])
- name_list = ['__cf_bm', 'cf_clearance']
-
- for cookie in cookies_list:
- if cookie['name'] in name_list:
- self.session.cookies.set(
- cookie['name'],
- cookie['value'],
- domain=cookie['domain'],
- path='/'
- )
-
- ua = parsed_result.get('userAgent')
- if ua:
- self.user_agent = ua
- self.session.headers['User-Agent'] = ua
-
- VSC_INFO("tls_plg", "[%s] Cloudflare 5s challenge solved.", self.group_id)
- return True
- except Exception as e:
- VSC_ERROR("tls_plg", f"Failed to parse Cloudflare result: {e}")
-
- return False
- def _solve_recaptcha(self, params) -> Optional[str]:
- """
- 调用 Capsolver (保留原脚本逻辑)
- """
- try:
- key = params.get("apiToken")
- if not key: return None
-
- submit_url = "https://api.capsolver.com/createTask"
- task = {
- "type": params.get("type"),
- "websiteURL": params.get("page"),
- "websiteKey": params.get("siteKey"),
- }
- if params.get("action"):
- task["pageAction"] = params.get("action")
-
- if params.get("proxy"):
- p = urlparse(params.get("proxy"))
- task["proxyType"] = p.scheme
- task["proxyAddress"] = p.hostname
- task["proxyPort"] = p.port
- if p.username:
- task["proxyLogin"] = p.username
- task["proxyPassword"] = p.password
-
- payload = {"clientKey": key, "task": task}
- r = requests.post(submit_url, json=payload, timeout=20)
- if r.status_code != 200: return None
-
- task_id = r.json().get("taskId")
- if not task_id: return None
-
- # Query
- for _ in range(20):
- r = requests.post("https://api.capsolver.com/getTaskResult", json={"clientKey": key, "taskId": task_id}, timeout=20)
- if r.status_code == 200:
- d = r.json()
- if d.get("status") == "ready":
- return d["solution"]["gRecaptchaResponse"]
- time.sleep(3)
- except Exception as e:
- VSC_ERROR("tls_plg", f"Capsolver error: {e}")
- return None
- def _parse_travel_groups(self, html: str) -> List[Dict]:
- groups = []
- try:
- js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups'
- js_match = re.search(js_pattern, html, re.DOTALL)
- if js_match:
- json_str = js_match.group(1).replace(r'\"', '"')
- data = json.loads(json_str)
- for g in data:
- groups.append({
- 'group_name': g.get('groupName'),
- 'group_number': g.get('formGroupId'),
- 'location': g.get('vacName')
- })
- except:
- pass
- return groups
- def _parse_appointment_slots(self, html: str) -> List[Dict]:
- slots = []
- try:
- # 增强正则:匹配 "availableAppointments": 或 \"availableAppointments\":
- # 并且兼容末尾是 ,"showFlexi... 或 ,\"showFlexi...
- # DOTALL 模式确保匹配跨行
- pattern = r'availableAppointments\\?":\s*(\[.*?\])(?:,\\?"|\},)'
- match = re.search(pattern, html, re.DOTALL)
-
- if match:
- json_str = match.group(1)
- # 清理转义字符:将 \" 替换为 "
- json_str = json_str.replace(r'\"', '"')
-
- data = json.loads(json_str)
- for day in data:
- d_str = day.get('day')
- for s in day.get('slots', []):
- labels = s.get('labels', [])
- lbl = ""
- stype = ""
- cost = ""
-
- if 'pta' in labels:
- lbl = 'pta'
- stype = "Prime"
- elif 'ptaw' in labels:
- lbl = 'ptaw'
- stype = "Prime Weekend"
- elif '' in labels:
- lbl = ''
- stype = "Standard"
-
- if lbl or not labels:
- slots.append({
- 'date': d_str,
- 'time': s.get('time'),
- 'label': lbl,
- 'type': stype,
- 'cost': cost
- })
- except Exception as e:
- VSC_DEBUG("tls_plg", f"Slot parse error: {e}")
- pass
- return slots
- def _is_session_expired_page(self, html: str) -> bool:
- if not html: return False
- if 'availableAppointments' not in html: return True
- # 简化判断:如果包含 redirecting automatically 通常是过期
- if 'redirected automatically' in html.lower(): return True
- return False
|