import time import json import random import re import os import uuid import socket import shutil import requests import threading import select import base64 from datetime import datetime from urllib.parse import urlencode # DrissionPage 核心 from DrissionPage import ChromiumPage, ChromiumOptions class BrowserResponse: """模拟 requests.Response 的轻量级对象""" def __init__(self, result_dict): result_dict = result_dict or {} self.status_code = result_dict.get('status', 0) self.text = result_dict.get('body', '') self.headers = result_dict.get('headers', {}) self.url = result_dict.get('url', '') class TlsAutoBot: def __init__(self, config: dict): """ config 包含: proxy, account, capsolver_key, apt_config (code, country, city), target_dates 等 """ self.config = config self.instance_id = uuid.uuid4().hex[:8] self.workspace = os.path.abspath(os.path.join("data", f"tls_session_{self.instance_id}")) self.page = None self.travel_group = None def _log(self, msg): print(f"[TLS-Bot-{self.instance_id}] {msg}") def _get_free_port(self): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) return s.getsockname()[1] def init_browser(self): self._log("Initializing browser...") co = ChromiumOptions() # 1. 端口与路径隔离 port = self._get_free_port() co.set_local_port(port) co.set_user_data_path(self.workspace) # 2. 代理配置 proxy_cfg = self.config.get('proxy', {}) proxy_str = f"{proxy_cfg.get('proto')}://{proxy_cfg.get('ip')}:{proxy_cfg.get('port')}" print(f'set proxy={proxy_str}') co.set_argument(f'--proxy-server={proxy_str}') # 3. 反爬配置 co.headless(False) co.set_argument('--no-sandbox') co.set_argument('--disable-gpu') co.set_argument('--disable-dev-shm-usage') co.set_argument('--window-size=1920,1080') co.set_argument('--disable-blink-features=AutomationControlled') self.page = ChromiumPage(co) def solve_captcha(self, page_url: str, task_type: str, site_key: str, use_proxy = False, action: str = None, api_domain: str = None) -> str: """通用解决验证码 (同步 User-Agent 防止被盾识别为高风险)""" capsolver_key = self.config.get('capsolver_key') if not capsolver_key: raise ValueError("Capsolver API key missing") task = { "type": task_type, "websiteURL": page_url, "websiteKey": site_key, } if api_domain: task["apiDomain"] = api_domain if use_proxy: proxy = self.config['proxy'] task["proxyType"] = proxy.get('proto', 'http') task["proxyAddress"] = proxy.get('ip') task["proxyPort"] = int(proxy.get('port')) if proxy.get('username'): task["proxyLogin"] = proxy.get('username') task["proxyPassword"] = proxy.get('password') if action: task["pageAction"] = action payload = {"clientKey": capsolver_key, "task": task} res = requests.post("https://api.capsolver.com/createTask", json=payload, timeout=20) if res.status_code != 200 or res.json().get("errorId") != 0: raise Exception(f"Failed to create capsolver task: {res.text}") task_id = res.json().get("taskId") self._log(f"Task created: {task_id}. Waiting for solution...") for _ in range(30): r = requests.post( "https://api.capsolver.com/getTaskResult", json={"clientKey": capsolver_key, "taskId": task_id}, timeout=20 ) data = r.json() if data.get("status") == "ready": self._log("Captcha solved successfully!") return data["solution"].get("gRecaptchaResponse") or data["solution"].get("token") time.sleep(3) raise Exception("Capsolver task timeout") def login(self): """执行自动登录流程并提取 Group ID""" self.init_browser() apt_config = self.config['apt_config'] login_url = "https://visas-fr.tlscontact.com/en-us/login" params = { "issuerId": apt_config["code"], "country": apt_config["country"], "vac": apt_config["code"], "redirect": f"/en-us/country/{apt_config['country']}/vac/{apt_config['code']}" } full_login_url = f"{login_url}?{urlencode(params)}" self._log(f"Navigating to login: {full_login_url}") self.page.get(full_login_url) time.sleep(3) wait_start = time.time() while True: # 获取页面 HTML,转小写 # 注意:如果此处报错 "页面被刷新",是 DrissionPage 的机制问题, # 但你要求先不处理复杂错误,所以这里保持最简单的写法。 html = self.page.html.lower() # 检查是否在排队室 (法语或英语) if "file d'attente" in html or "waiting room" in html: # 如果等太久(比如1小时),就强制停止 if time.time() - wait_start > 6 * 60: self._log("Waiting room timeout (1h).") break self._log("In Waiting Room... Waiting for auto-refresh.") time.sleep(30) # 截图说页面会自动刷新,所以这里只sleep,不动浏览器 else: # 页面里没有“等候室”的字了,说明出来了 break if not self.page.ele('#email-input-field'): self._log("Form not found, reloading...") self.page.get(full_login_url) self.page.wait.ele_displayed('#email-input-field', timeout=15) self._log("Waiting 3 seconds for Captcha scripts to load...") time.sleep(3) g_token = "" # 判断登录页是否有 ReCaptcha if self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'): self._log("Login ReCaptcha detected, solving...") # 登录页通常是 V2 g_token = self.solve_captcha( page_url=self.page.url, task_type="ReCaptchaV2TaskProxyLess", site_key="6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0" ) account = self.config['account'] js_login = f""" var u = document.getElementById('email-input-field'); if(u) {{ u.value = "{account['username']}"; u.dispatchEvent(new Event('input', {{bubbles:true}})); }} var p = document.getElementById('password-input-field'); if(p) {{ p.value = "{account['password']}"; p.dispatchEvent(new Event('input', {{bubbles:true}})); }} var g = document.getElementById('g-recaptcha-response'); if(g) {{ g.value = "{g_token}"; }} var btn = document.getElementById('btn-login'); if(btn) {{ btn.click(); return true; }} else {{ return false; }} """ self._log("Submitting Login via JS...") self.page.run_js(js_login) self._log("Waiting for dashboard redirect...") self.page.wait.url_change('login-actions', exclude=True, timeout=45) time.sleep(4) if "login-actions" in self.page.url or "auth" in self.page.url: raise Exception("Login Failed! Invalid credentials or Captcha rejected.") self._log("Waiting for dashboard...") self.page.wait.load_start() time.sleep(5) # 解析 Dashboard 提取 Group ID self._log("Parsing Dashboard for Travel Group...") html = self.page.html js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups' js_match = re.search(js_pattern, html, re.DOTALL) groups = [] if js_match: json_str = js_match.group(1).replace(r'\"', '"') groups = json.loads(json_str) target_city = apt_config['city'].lower() for g in groups: if g.get('vacName', '').lower() == target_city: self.travel_group = g break if not self.travel_group: raise Exception(f"Travel Group not found for city: {target_city}") formgroup_id = self.travel_group.get('formGroupId') self._log(f"Waiting for group button to render: {formgroup_id}") # 隐患修复:确保按钮渲染出来后再点,防止 JS 找不到元素 btn_selector = f'tag:button@@name=formGroupId@@value={formgroup_id}' self.page.wait.ele_displayed(btn_selector, timeout=15) self._log(f"Select group_id={formgroup_id} via JS...") # 替代繁琐的 run_js,直接用内置的 by_js=True 触发 self.page.ele(btn_selector).click(by_js=True) self._log("Waiting for service-level redirect...") self.page.wait.url_change('travel-groups', exclude=True, timeout=45) time.sleep(2) # 页面跳转后给个短缓冲 if "travel-groups" in self.page.url or "auth" in self.page.url: raise Exception("Redirect to service-level Failed!") # ========================================== # 2. 点击进入 Appointment Booking # ========================================== self._log("Waiting for book-appointment button to render...") # 隐患修复:同样必须等待 Continue 按钮渲染完成 self.page.wait.ele_displayed('#book-appointment-btn', timeout=15) self._log("Clicking 'Continue' to appointment booking via JS...") self.page.ele('#book-appointment-btn').click(by_js=True) self._log("Waiting for appointment-booking redirect...") self.page.wait.url_change('service-level', exclude=True, timeout=45) time.sleep(2) if "service-level" in self.page.url or "auth" in self.page.url: raise Exception("Redirect to appointment-booking Failed!") self._log("Waiting for appointment-booking page to fully load...") self.page.wait.load_start() time.sleep(3) self._log(f"✅ Login & Navigation Success! Target Group ID: {formgroup_id}") def query_slots(self) -> list: """根据当前 UI 状态自动判断路由,并使用高鲁棒性特征提取 Slot""" group_num = self.travel_group['formGroupId'] apt_config = self.config['apt_config'] interest_month = self.config.get("interest_month", time.strftime("%m-%Y")) # 转换工具 target_date_obj = datetime.strptime(interest_month, "%m-%Y") target_month_text = target_date_obj.strftime("%B %Y") target_year = target_date_obj.year target_month_num = target_date_obj.month slots = [] # 获取选中的月份:通过 data-testid 找到当前月份按钮并提取文本 # 注意:这里改用 data-testid 寻找当前月份,更稳健 current_selected_ele = self.page.ele('@data-testid=btn-current-month-available') current_month_text = current_selected_ele.text.strip() if current_selected_ele else "" is_on_target_month = (current_month_text.lower() == target_month_text.lower()) if not is_on_target_month: # ========================================== # 模式 A: 不在目标月份 (UI 路由 + HTML 强鲁棒性解析) # ========================================== self._log(f"Current is '{current_month_text}', navigating to '{target_month_text}'...") for _ in range(12): target_btn_xpath = f'xpath://a[contains(@href, "month={interest_month}")]' target_btn = self.page.ele(target_btn_xpath) if target_btn: target_btn.click(by_js=True) time.sleep(3) break next_btn = self.page.ele('@data-testid=btn-next-month-available') if next_btn: next_btn.click(by_js=True) time.sleep(2) else: self._log("Warning: Cannot find target month or 'Next Month' button.") break self._log("Extracting slots from DOM using robust data-testid features...") # 【高鲁棒性提取】 # 特征定义:寻找所有的“日期块”。 # 什么是一个日期块?它是一个 div,里面直接包含一个 p 标签(放日期), # 且同时包含另一个 div,其内部有带有 'slot' 字样的按钮。 day_blocks_xpath = '//div[p and div//button[contains(@data-testid, "slot")]]' day_blocks = self.page.eles(f'xpath:{day_blocks_xpath}') for block in day_blocks: # 1. 提取日期:只要是这个 block 下的 p 标签,必定是 "Mon 01" 这种 p_ele = block.ele('tag:p') if not p_ele: continue # 直接从 p 标签的纯文本里抽取出数字,忽略前面的字母 day_match = re.search(r'\d+', p_ele.text) if not day_match: continue day_str = day_match.group() full_date = f"{target_year}-{target_month_num:02d}-{int(day_str):02d}" # 2. 提取可用按钮:利用 data-testid 前缀匹配 # 完美过滤掉 btn-unavailable-slot (灰色的不可用按钮) available_btns = block.eles('xpath:.//button[starts-with(@data-testid, "btn-available-slot")]') for btn in available_btns: # 提取时间:无视内部各种 span 的变动,只要 html 里有 00:00 这种格式就被截取 time_match = re.search(r'\d{2}:\d{2}', btn.html) if not time_match: continue time_str = time_match.group() # 提取 Label:完全依赖测试工程师留下的 testid test_id = btn.attr('data-testid') or "" if 'prime' in test_id and 'weekend' in test_id: lbl = 'ptaw' elif 'prime' in test_id: lbl = 'pta' else: lbl = '' slots.append({ 'date': full_date, 'time': time_str, 'label': lbl }) else: # ========================================== # 模式 B: 已经在目标月份 (JS Fetch + 正则 JSON 解析) # ========================================== self._log(f"Already on '{target_month_text}'. Executing silent JS fetch...") base_url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking' params = {'location': apt_config["code"], 'month': interest_month} query_url = f"{base_url}?{urlencode(params)}" js_script = f""" return fetch("{query_url}", {{ credentials: "include" }}) .then(async r => {{ return {{ status: r.status, body: await r.text() }}; }}) .catch(e => {{ return {{ status: 0, body: e.toString() }}; }}); """ res_dict = self.page.run_js(js_script) resp = BrowserResponse(res_dict) if resp.status_code != 200: raise Exception(f"Silent Query Failed: {resp.status_code}") self._log("Extracting slots from JSON response...") pattern = r'"availableAppointments\\":\s*(\[.*?\]),\\"showFlexiAppointment' match = re.search(pattern, resp.text, re.DOTALL) if match: json_str = match.group(1).replace(r'\"', '"') data = json.loads(json_str) for day in data: d_str = day.get('day') for s in day.get('slots', []): labels = s.get('labels', []) if not labels: continue lbl = "" if 'pta' in labels: lbl = 'pta' elif 'ptaw' in labels: lbl = 'ptaw' elif '' in labels: lbl = '' slots.append({ 'date': d_str, 'time': s.get('time'), 'label': lbl }) self._log(f"Found {len(slots)} valid slots.") return slots def _filter_dates(self, available_dates: list, start_str: str, end_str: str) -> list: if not start_str or not end_str: return available_dates valid = [] s_date = datetime.strptime(start_str[:10], "%Y-%m-%d") e_date = datetime.strptime(end_str[:10], "%Y-%m-%d") for d in available_dates: curr = datetime.strptime(d, "%Y-%m-%d") if s_date <= curr <= e_date: valid.append(d) return valid def book(self, all_slots: list) -> bool: """执行预定流程""" if not all_slots: self._log("No slots provided to book.") return False # 1. 过滤日期 & 筛选标签 target_labels = self.config.get('target_labels', ['']) exp_start = self.config.get('expected_start_date', '') exp_end = self.config.get('expected_end_date', '') unique_dates = list(set([s['date'] for s in all_slots])) valid_dates = self._filter_dates(unique_dates, exp_start, exp_end) possible_slots =[ s for s in all_slots if s['date'] in valid_dates and s['label'] in target_labels ] if not possible_slots: self._log("No slots match target dates and labels.") return False # 2. 随机选择一个 Slot selected = random.choice(possible_slots) sel_date = selected['date'] sel_time = selected['time'] sel_label = selected['label'] self._log(f"Selected Slot -> Date: {sel_date}, Time: {sel_time}, Label: {sel_label or 'standard'}") group_num = self.travel_group['formGroupId'] apt_config = self.config['apt_config'] current_url = self.page.url router_state = f'%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22{group_num}%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D' # 3. 获取金额 (Basket Cost) # self._log("Fetching basket cost...") # cost_payload =[{"groupId": str(group_num), "lang": "en-us", "labels": [sel_label]}] # cost_body_json = json.dumps(cost_payload) # js_cost = f""" # return fetch("{current_url}", {{ # method: 'POST', # headers: {{ # 'Next-Action': '40124cc90acef520d4fd2daf60ad3c8e21fc2c11d8', # 'Next-Router-State-Tree': '{router_state}', # 'Accept': 'text/x-component', # 'Content-Type': 'text/plain;charset=UTF-8' # }}, # body: `{cost_body_json}` # }}).then(async r => {{ return {{ status: r.status, body: await r.text() }}; }}) # .catch(e => {{ return {{ status: 0, body: e.toString() }}; }}); # """ # cost_res = BrowserResponse(self.page.run_js(js_cost)) # if cost_res.status_code != 200: # self._log(f"Basket cost check failed: {cost_res.status_code}") # return False # 4. 解决 ReCaptcha V3 # self._log("Solving Booking ReCaptcha V3...") # g_token = self.solve_captcha( # page_url=current_url, # task_type="ReCaptchaV3TaskProxyLess", # site_key="6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9", # use_proxy=False, # action="book", # api_domain="recaptcha.net" # ) # 5. 注入 Hook 并点击表单 # self._log("Injecting reCAPTCHA hook and modifying form...") # # 5.1 注入你提供的劫持 JS # hook_js = f""" # // 1. 填充可能存在的标准隐藏域 # var input = document.getElementById('g-recaptcha-response'); # if(input) {{ # input.value = "{g_token}"; # // 派发React识别的事件 # input.dispatchEvent(new Event('input', {{ bubbles: true }})); # input.dispatchEvent(new Event('change', {{ bubbles: true }})); # }} # // 2. 劫持 grecaptcha.execute # var mockExecute = function() {{ # console.log("[Hook] Recaptcha execution intercepted!"); # return Promise.resolve("{g_token}"); # }}; # // 无论网页是否已加载完毕,保证对象存在并被劫持 # if (!window.grecaptcha) {{ # window.grecaptcha = {{}}; # }} # window.grecaptcha.execute = mockExecute; # if (!window.grecaptcha.enterprise) {{ # window.grecaptcha.enterprise = {{}}; # }} # window.grecaptcha.enterprise.execute = mockExecute; # """ # self.page.run_js(hook_js) # 5.2 注入表单数据并原样点击 js_inject_and_click = f""" try {{ const form = document.querySelector('form'); if (!form) return 'Form not found'; function setReactValue(input, value) {{ if (!input) return; input.value = value; input.dispatchEvent(new Event('input', {{ bubbles: true }})); input.dispatchEvent(new Event('change', {{ bubbles: true }})); }} // 填入抢单日期、时间、标签 setReactValue(form.querySelector('input[name="date"]'), '2026-06-01'); setReactValue(form.querySelector('input[name="time"]'), '12:00'); setReactValue(form.querySelector('input[name="appointmentLabel"]'), '{sel_label}'); // 解禁并点击 Submit 按钮 const submitBtn = form.querySelector('button[type="submit"]'); if (submitBtn) {{ submitBtn.removeAttribute('disabled'); submitBtn.classList.remove('opacity-50', 'cursor-not-allowed'); submitBtn.click(); return 'clicked'; }} else {{ return 'Submit button not found'; }} }} catch (e) {{ return e.toString(); }} """ inject_res = self.page.run_js(js_inject_and_click) self._log(f"Form submission triggered: {inject_res}") if inject_res != 'clicked': self._log("❌ Failed to inject form or click the submit button.") return False # 6. 验证是否抢单成功 (轮询页面跳转) self._log("Waiting for Next.js to process the form submission...") for _ in range(15): time.sleep(1.0) current_page_url = self.page.url # Next.js 跳转进入确认页 if "appointment-confirmation" in current_page_url: self._log(f"✅ BOOKING SUCCESS! Redirected to: {current_page_url}") return True try: body_text = str(self.page.run_js("return document.body.innerText || '';")) if "APPOINTMENT_LIMIT_REACHED" in body_text or "appointment limit" in body_text.lower(): self._log("❌ BOOKING FAILED! Reason: Appointment Limit Reached") return False except Exception: pass self._log("❌ BOOKING FAILED! Timeout waiting for redirect confirmation.") return False # def book(self, all_slots: list) -> bool: # """执行预定流程""" # if not all_slots: # self._log("No slots provided to book.") # return False # # 1. 过滤日期 & 筛选标签 # target_labels = self.config.get('target_labels', ['']) # exp_start = self.config.get('expected_start_date', '') # exp_end = self.config.get('expected_end_date', '') # # 提取唯一的可用日期列表 # unique_dates = list(set([s['date'] for s in all_slots])) # valid_dates = self._filter_dates(unique_dates, exp_start, exp_end) # possible_slots = [ # s for s in all_slots # if s['date'] in valid_dates and s['label'] in target_labels # ] # if not possible_slots: # self._log("No slots match target dates and labels.") # return False # # 2. 随机选择一个 Slot # selected = random.choice(possible_slots) # sel_date = selected['date'] # sel_time = selected['time'] # sel_label = selected['label'] # self._log(f"Selected Slot -> Date: {sel_date}, Time: {sel_time}, Label: {sel_label or 'standard'}") # group_num = self.travel_group['formGroupId'] # apt_config = self.config['apt_config'] # current_url = self.page.url # router_state = f'%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22{group_num}%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D' # # 3. 获取金额 (Basket Cost) # self._log("Fetching basket cost...") # cost_payload = [{"groupId": str(group_num), "lang": "en-us", "labels": [sel_label]}] # cost_body_json = json.dumps(cost_payload) # js_cost = f""" # return fetch("{current_url}", {{ # method: 'POST', # headers: {{ # 'Next-Action': '40124cc90acef520d4fd2daf60ad3c8e21fc2c11d8', # 'Next-Router-State-Tree': '{router_state}', # 'Accept': 'text/x-component', # 'Content-Type': 'text/plain;charset=UTF-8' # }}, # body: `{cost_body_json}` # }}).then(async r => {{ return {{ status: r.status, body: await r.text() }}; }}) # .catch(e => {{ return {{ status: 0, body: e.toString() }}; }}); # """ # cost_res = BrowserResponse(self.page.run_js(js_cost)) # if cost_res.status_code != 200: # self._log(f"Basket cost check failed: {cost_res.status_code}") # return False # # 4. 解决 ReCaptcha V3 # self._log("Solving Booking ReCaptcha V3...") # g_token = self.solve_captcha( # page_url=current_url, # task_type="ReCaptchaV3M1TaskProxyLess", # site_key="6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9", # use_proxy=False, # action="book", # api_domain="recaptcha.net" # ) # # 5. 提交 Booking # self._log("Submitting final booking request...") # js_book = f""" # const formData = new FormData(); # formData.append('1_formGroupId', '{group_num}'); # formData.append('1_lang', 'en-us'); # formData.append('1_process', 'APPOINTMENT'); # formData.append('1_location', '{apt_config["code"]}'); # formData.append('1_date', '{sel_date}'); # formData.append('1_time', '{sel_time}'); # formData.append('1_appointmentLabel', '{sel_label}'); # formData.append('1_captchaToken', '{g_token}'); # formData.append('0', '[{{"status":"IDLE"}},"$K1"]'); # return fetch("{current_url}", {{ # method: 'POST', # headers: {{ # 'Next-Action': '6043cfd107081bc817cbb11a8c0db17d3a063401be', # 'Next-Router-State-Tree': '{router_state}', # 'Accept': 'text/x-component' # }}, # body: formData # }}).then(async r => {{ # const hdrs = {{}}; # r.headers.forEach((v, k) => hdrs[k] = v); # return {{ status: r.status, body: await r.text(), headers: hdrs, url: r.url }}; # }}).catch(e => {{ return {{ status: 0, body: e.toString() }}; }}); # """ # book_res_dict = self.page.run_js(js_book) # book_resp = BrowserResponse(book_res_dict) # # 6. 解析结果 (判定 Next.js 跳转) # headers_lower = {str(k).lower(): v for k, v in book_resp.headers.items()} # action_redirect = headers_lower.get('x-action-redirect', '') # is_success = ( # book_resp.status_code == 303 or # (book_resp.status_code == 200 and ("appointment-confirmation" in action_redirect or "appointment-confirmation" in book_resp.url)) # ) # if is_success: # self._log(f"✅ BOOKING SUCCESS! Redirected to: {action_redirect or book_resp.url}") # return True # else: # self._log(f"❌ BOOKING FAILED! Status: {book_resp.status_code}") # if "APPOINTMENT_LIMIT_REACHED" in book_resp.text: # self._log("-> Reason: Appointment Limit Reached") # else: # self._log(f"-> Response Body: {book_resp.text[:300]}") # return False def cleanup(self): self._log("Cleaning up resources...") if self.page: try: self.page.quit() except: pass if os.path.exists(self.workspace): time.sleep(1) shutil.rmtree(self.workspace, ignore_errors=True) # ===================================================================== # 运行主逻辑 # ===================================================================== if __name__ == "__main__": # 填写你的账号配置 MY_CONFIG = { # 账号信息 "account": { "username": "mayun06@gmail-app.com", "password": "Visafly@111" }, # 目标签证中心信息 (例如广州 TLS: cnCNG2fr) "apt_config": { "country": "cn", "city": "Chengdu", "code": "cnCNG2fr" }, # 代理配置 "proxy": { "proto": "http", "ip": "127.0.0.1", "port": "7890", "username": "", "password": "" }, # Capsolver API Key "capsolver_key": "CAP-5441DD341DD3CC2FAEF0BE6FE493EE9A", # 查询的月份 (格式: MM-YYYY) "interest_month": "06-2026", # 期望的日期范围 "expected_start_date": "2026-06-01", "expected_end_date": "2026-06-30", # 目标标签: '' 是普通号, 'pta' 是 Prime 黄金时间号 "target_labels": [""] } bot = TlsAutoBot(config=MY_CONFIG) try: # 1. 登录 bot.login() # 2. 检查是否有号 slots = bot.query_slots() if slots: bot._log(f"Found {len(slots)} total available slots in this month.") # 3. 尝试预订 success = bot.book(slots) if success: print("\n🎉 Congratulations! Slot booked successfully!") else: print("\n⚠️ Failed to book the slot.") else: bot._log("No available slots found for the requested criteria.") time.sleep(3600) except Exception as e: bot._log(f"Error occurred during execution: {str(e)}") finally: bot.cleanup()