| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- import time
- import json
- import random
- import re
- import os
- from datetime import datetime
- from typing import List, Dict, Optional, Any
- from urllib.parse import urljoin, urlparse
- from curl_cffi import requests, const
- from bs4 import BeautifulSoup
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_DEBUG, VSC_WARN
- from toolkit.vs_cloud_api import VSCloudApi
- class TlsPlugin(IVSPlg):
- """
- TLSContact 签证预约插件
- 适配法国签证 (FR) 流程
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.is_healthy = True
- # 会话相关
- self.session: Optional[requests.Session] = None
- self.travel_group: Optional[Dict] = None
- self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
- def get_group_id(self) -> str:
- return self.group_id
- def set_config(self, config: VSPlgConfig):
- self.config = config
- try:
- self.free_config = json.loads(config.free_config) if config.free_config else {}
- except:
- self.free_config = {}
- def health_check(self) -> bool:
- return self.is_healthy
- def _save_debug_html(self, content: str, prefix: str = "debug"):
- save_dir = "debug_pages"
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{save_dir}/{prefix}_{self.group_id}_{timestamp}.html"
- with open(filename, "w", encoding="utf-8") as f:
- f.write(content)
- VSC_INFO("tls_plg", "[%s] HTML saved to: %s", self.group_id, filename)
-
- def _get_proxy_url(self):
- # 构造代理
- proxy_url = ""
- if self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- proxy_url = f"{s.scheme}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- proxy_url = f"{s.scheme}://{s.ip}:{s.port}"
- return proxy_url
- def create_session(self):
- """
- 创建会话:处理 Cloudflare -> 登录 -> 获取 Travel Group
- """
- # 1. 初始化 Session
- curlopt = {
- const.CurlOpt.MAXAGE_CONN: 1800,
- const.CurlOpt.MAXLIFETIME_CONN: 1800,
- const.CurlOpt.VERBOSE: False,
- }
-
- self.session = requests.Session(
- proxy=self._get_proxy_url(),
- impersonate="chrome124",
- curl_options=curlopt,
- use_thread_local_curl=False,
- http_version=const.CurlHttpVersion.V2TLS
- )
- embassy = self.free_config.get('center', {})
- if not embassy:
- raise NotFoundError(message="center not found in free config")
- # 2. 解决 Cloudflare 5s 盾
- self._solve_cloudflare5S_challenge()
- # 3. 获取登录页面参数 (OIDC)
- login_page = "https://visas-fr.tlscontact.com/en-us/login"
- params = {
- "issuerId": embassy["code"],
- "country": embassy["country"],
- "vac": embassy["code"],
- "redirect": f"/en-us/country/{embassy['country']}/vac/{embassy['code']}"
- }
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Referer': f'https://visas-fr.tlscontact.com/en-us/country/{embassy["country"]}/vac/{embassy["code"]}',
- 'User-Agent': self.user_agent,
- }
- resp = self._perform_request("GET", login_page, headers=headers, params=params)
- self._save_debug_html(resp.text, 'Login_Page')
- # 解析 Keycloak 登录地址
- soup = BeautifulSoup(resp.text, 'html.parser')
- form = soup.find('form')
- if not form:
- raise NotFoundError(message="Login form not found")
- action = form.get('action')
- authenticate_url = action if action.startswith('http') else urljoin(resp.url, action)
- # 4. 解决 ReCaptcha V2 (登录验证码)
- api_token = self.free_config.get("capsolver_key", "")
- if not api_token:
- raise NotFoundError(message="Missing 'capsolver_key' in free_config, captcha might fail.")
-
- rc_params = {
- "type": "ReCaptchaV2TaskProxyLess",
- "page": authenticate_url,
- "siteKey": "6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0",
- "apiToken": api_token,
- "proxy": self._get_proxy_url()
- }
- g_token = self._solve_recaptcha(rc_params)
- # 5. 提交登录
- payload = {
- 'username': self.config.account.username,
- 'password': self.config.account.password,
- 'g-recaptcha-response': g_token
- }
- headers['Content-Type'] = 'application/x-www-form-urlencoded'
- resp = self._perform_request("POST", authenticate_url, headers=headers, data=payload)
- self._save_debug_html(resp.text, 'Travel_Groups_Page')
- # 6. 解析 Travel Groups
- groups = self._parse_travel_groups(resp.text)
-
- # 选择匹配城市的 Group
- target_city = embassy['city'].lower()
- for g in groups:
- if g['location'].lower() == target_city:
- self.travel_group = g
- break
-
- if not self.travel_group:
- raise NotFoundError(message=f"No group found for city {target_city}")
- VSC_INFO("tls_plg", "[%s] Session created. Group: %s", self.group_id, self.travel_group['group_number'])
- def query(self) -> VSQueryResult:
- res = VSQueryResult()
- res.success = False
- embassy = self.free_config.get('center', {})
- group_num = self.travel_group['group_number']
- interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y"))
-
- url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking'
- params = {
- 'location': embassy["code"],
- 'month': interest_month,
- }
- headers = {
- 'accept': '*/*',
- 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'referer': f'{url}?location={embassy["code"]}',
- 'user-agent': self.user_agent,
- }
- resp = self._perform_request("GET", url, headers=headers, params=params)
-
- self._save_debug_html(resp.text, 'Query_Slot_Page')
- self._check_session_expired_page(resp.text)
- # 检测关键词
- if not "availableAppointments" in resp.text:
- raise NotFoundError(message='Query result not found availableAppointments')
- # 3. 解析 Slots
- all_slots = self._parse_appointment_slots(resp.text)
- target_labels = self.free_config.get("target_labels", ["", "pta"])
- available = [s for s in all_slots if s.get("label") in target_labels]
- res.city = self.free_config.get('city', '')
- res.country = self.free_config.get('country', '')
- res.visa_type = self.free_config.get('visa_type', '')
- res.availability_status = AvailabilityStatus.NoneAvailable
- if available:
- res.success = True
- res.availability_status = AvailabilityStatus.Available
- res.earliest_date = available[0]['date']
- date_map = {}
- for s in available:
- d = s['date']
- date_map.setdefault(d, [])
- ts = VSQueryResult.DateAvailability.TimeSlot()
- ts.time = s['time']
- ts.label = f"{s['type']}"
- date_map[d].append(ts)
- for d, slots in date_map.items():
- da = VSQueryResult.DateAvailability()
- da.date = d
- da.times = slots
- res.availability.append(da)
- else:
- res.success = False
- return res
- def book(self, slot_info: VSQueryResult, user_input: Dict = None) -> VSBookResult:
- res = VSBookResult()
- res.success = False
- target_date = slot_info.availability[0].date
- target_time = slot_info.availability[0].times[0].time
- target_label = ""
-
- embassy = self.free_config.get('center', {})
- group_num = self.travel_group['group_number']
- interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y"))
-
- # 1. 解决 ReCaptcha V3
- page_url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking?location={embassy["code"]}&month={interest_month}'
- api_token = self.free_config.get("capsolver_key", "")
-
- rc_params = {
- "type": "ReCaptchaV3Task",
- "page": page_url,
- "action": "book",
- "siteKey": "6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9",
- "apiToken": api_token,
- "proxy": self._get_proxy_url()
- }
- g_token = self._solve_recaptcha(rc_params)
- # 2. 构造请求
- url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking'
-
- next_action = '601f284bf7ee33b6578ad0fad426fae18c232707f2'
- next_state = '%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22$GROUPID$%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D'
-
- headers = {
- 'Next-Action': next_action,
- 'Referer': page_url,
- 'Next-Router-State-Tree': next_state.replace("$GROUPID$", group_num),
- 'Accept': 'text/x-component',
- 'User-Agent': self.user_agent,
- }
- params = {
- 'location': embassy["code"],
- 'month': interest_month,
- }
-
- boundary = "----WebKitFormBoundary" + "".join(
- random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", k=16)
- )
- headers["Content-Type"] = f"multipart/form-data; boundary={boundary}"
- form_fields = {
- '1_formGroupId': str(group_num),
- '1_lang': 'en-us',
- '1_process': 'APPOINTMENT',
- '1_location': embassy["code"],
- '1_date': target_date,
- '1_time': target_time,
- '1_appointmentLabel': target_label,
- '1_captcha_token': g_token,
- '0': '[{"status":"IDLE"},"$K1"]'
- }
-
- body_parts = []
- for name, value in form_fields.items():
- body_parts.append(f"--{boundary}\r\n")
- body_parts.append(f'Content-Disposition: form-data; name="{name}"\r\n')
- body_parts.append("\r\n")
- body_parts.append(f"{value}\r\n")
- body_parts.append(f"--{boundary}--\r\n")
- body = "".join(body_parts).encode("utf-8")
-
- resp = self.session.post(url, params=params, headers=headers, data=body, allow_redirects=False)
- self._save_debug_html(resp.text, 'Book_Appointment_Page')
- if resp.status_code == 303:
- res.success = True
- res.book_date = target_date
- res.book_time = target_time
- return res
- else:
- VSC_WARN('tls_plg', 'Expected Status is 303, but got {resp.status_code}')
- res.success = False
- return res
-
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None):
- """
- 统一 HTTP 请求封装,严格复刻 C++ 逻辑:
- 1. 发送 OPTIONS 请求
- 2. 发送实际请求
- """
- print(f'[perform request] {method} {url}')
- resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30)
- VSC_INFO('tls_plg', resp.text)
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 401:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- elif resp.status_code == 403:
- raise PermissionDeniedError()
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError()
- else:
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
- def _solve_cloudflare5S_challenge(self):
- """
- 解决 Cloudflare 5s 盾
- """
- VSC_INFO("tls_plg", f"[{self.group_id}] Solving Cloudflare 5s...")
- embassy = self.free_config.get('center', {})
- website_url = f'https://visas-fr.tlscontact.com/en-us/country/{embassy["country"]}'
-
- # 1. 格式化代理字符串, 这里的接口要求格式通常是: host:port:user:pass (根据你的脚本示例)
- p = self.config.proxy
- if p.username:
- proxy_str = f"{p.ip}:{p.port}:{p.username}:{p.password}"
- else:
- proxy_str = f"{p.ip}:{p.port}"
- # 2. 提交任务
- task = VSCloudApi.Instance().submit_anticloudflare_task(proxy_str, website_url)
- # 3. 等待结果
- task_id = str(task['id'])
- result = VSCloudApi.Instance().get_anticloudflare_result(task_id)
- parsed = json.loads(result.get('result', '{}'))
- cookies_list = parsed.get('cookies', [])
- for cookie in cookies_list:
- if cookie['name'] in ['__cf_bm', 'cf_clearance']:
- self.session.cookies.set(
- cookie['name'],
- cookie['value'],
- domain=cookie['domain'],
- path='/'
- )
- ua = parsed.get('userAgent')
- if ua:
- self.user_agent = ua
- self.session.headers['User-Agent'] = ua
- VSC_INFO("tls_plg", "[%s] Cloudflare 5s challenge solved.", self.group_id)
- def _solve_recaptcha(self, params) -> str:
- """
- 调用 Capsolver
- """
- key = params.get("apiToken")
- if not key:
- raise NotFoundError(message="Api-token is required for recaptcha solver")
-
- submit_url = "https://api.capsolver.com/createTask"
- task = {
- "type": params.get("type"),
- "websiteURL": params.get("page"),
- "websiteKey": params.get("siteKey"),
- }
- if params.get("action"):
- task["pageAction"] = params.get("action")
-
- if params.get("proxy"):
- p = urlparse(params.get("proxy"))
- task["proxyType"] = p.scheme
- task["proxyAddress"] = p.hostname
- task["proxyPort"] = p.port
- if p.username:
- task["proxyLogin"] = p.username
- task["proxyPassword"] = p.password
-
- payload = {"clientKey": key, "task": task}
- r = requests.post(submit_url, json=payload, timeout=20)
- if r.status_code != 200:
- raise BizLogicError(message="Failed to submit capsolver task")
-
- task_id = r.json().get("taskId")
- for _ in range(20):
- r = requests.post("https://api.capsolver.com/getTaskResult", json={"clientKey": key, "taskId": task_id}, timeout=20)
- if r.status_code == 200:
- d = r.json()
- if d.get("status") == "ready":
- return d["solution"]["gRecaptchaResponse"]
- time.sleep(3)
- raise BizLogicError(message="Capsolver task timeout")
- def _parse_travel_groups(self, html: str) -> List[Dict]:
- groups = []
- js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups'
- js_match = re.search(js_pattern, html, re.DOTALL)
- if js_match:
- json_str = js_match.group(1).replace(r'\"', '"')
- data = json.loads(json_str)
- for g in data:
- groups.append({
- 'group_name': g.get('groupName'),
- 'group_number': g.get('formGroupId'),
- 'location': g.get('vacName')
- })
- else:
- VSC_WARN('tls_plg', 'Parsed travel group page, but not found travelGroups')
- return groups
- def _parse_appointment_slots(self, html: str) -> List[Dict]:
- slots = []
- pattern = r'availableAppointments\\?":\s*(\[.*?\])(?:,\\?"|\},)'
- match = re.search(pattern, html, re.DOTALL)
-
- if match:
- json_str = match.group(1).replace(r'\"', '"')
- data = json.loads(json_str)
- for day in data:
- d_str = day.get('day')
- for s in day.get('slots', []):
- labels = s.get('labels', [])
- lbl = ""
- stype = ""
- cost = ""
-
- if 'pta' in labels:
- lbl = 'pta'
- stype = "Prime"
- elif 'ptaw' in labels:
- lbl = 'ptaw'
- stype = "Prime Weekend"
- elif '' in labels:
- lbl = ''
- stype = "Standard"
-
- if lbl or not labels:
- slots.append({
- 'date': d_str,
- 'time': s.get('time'),
- 'label': lbl,
- 'type': stype,
- 'cost': cost
- })
- else:
- VSC_WARN("tls_plg", 'Parsed appointment slots page, but not found availableAppointments')
- return slots
- def _check_session_expired_page(self, html: str) -> bool:
- if not html:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- if 'availableAppointments' not in html:
- if 'redirected automatically' in html.lower():
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- if 'login' in html.lower() and 'password' in html.lower():
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- if 'session expired!' in html.lower() and 'for security reasons, your session has expired. please log in again to continue.' in html.lower() and 'you will be redirected automatically in 10 seconds.' in html.lower():
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
|