| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781 |
- import time
- import json
- import random
- import re
- import os
- import uuid
- import socket
- import shutil
- import requests
- import threading
- import select
- import base64
- from datetime import datetime
- from urllib.parse import urlencode
- # DrissionPage 核心
- from DrissionPage import ChromiumPage, ChromiumOptions
- class BrowserResponse:
- """模拟 requests.Response 的轻量级对象"""
- def __init__(self, result_dict):
- result_dict = result_dict or {}
- self.status_code = result_dict.get('status', 0)
- self.text = result_dict.get('body', '')
- self.headers = result_dict.get('headers', {})
- self.url = result_dict.get('url', '')
- class TlsAutoBot:
- def __init__(self, config: dict):
- """
- config 包含: proxy, account, capsolver_key, apt_config (code, country, city), target_dates 等
- """
- self.config = config
- self.instance_id = uuid.uuid4().hex[:8]
- self.workspace = os.path.abspath(os.path.join("data", f"tls_session_{self.instance_id}"))
- self.page = None
- self.travel_group = None
- def _log(self, msg):
- print(f"[TLS-Bot-{self.instance_id}] {msg}")
- def _get_free_port(self):
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('', 0))
- return s.getsockname()[1]
- def init_browser(self):
- self._log("Initializing browser...")
- co = ChromiumOptions()
-
- # 1. 端口与路径隔离
- port = self._get_free_port()
- co.set_local_port(port)
- co.set_user_data_path(self.workspace)
-
- # 2. 代理配置
- proxy_cfg = self.config.get('proxy', {})
- proxy_str = f"{proxy_cfg.get('schema')}://{proxy_cfg.get('ip')}:{proxy_cfg.get('port')}"
- print(f'set proxy={proxy_str}')
- co.set_argument(f'--proxy-server={proxy_str}')
- # 3. 反爬配置
- co.headless(False)
- co.set_argument('--no-sandbox')
- co.set_argument('--disable-gpu')
- co.set_argument('--disable-dev-shm-usage')
- co.set_argument('--window-size=1920,1080')
- co.set_argument('--disable-blink-features=AutomationControlled')
- self.page = ChromiumPage(co)
- def solve_captcha(self, page_url: str, task_type: str, site_key: str, use_proxy = False, action: str = None, api_domain: str = None) -> str:
- """通用解决验证码 (同步 User-Agent 防止被盾识别为高风险)"""
- capsolver_key = self.config.get('capsolver_key')
- if not capsolver_key:
- raise ValueError("Capsolver API key missing")
- task = {
- "type": task_type,
- "websiteURL": page_url,
- "websiteKey": site_key,
- }
-
- if api_domain:
- task["apiDomain"] = api_domain
-
- if use_proxy:
- proxy = self.config['proxy']
- task["proxyType"] = proxy.get('scheme', 'http')
- task["proxyAddress"] = proxy.get('ip')
- task["proxyPort"] = int(proxy.get('port'))
- if proxy.get('username'):
- task["proxyLogin"] = proxy.get('username')
- task["proxyPassword"] = proxy.get('password')
-
- if action:
- task["pageAction"] = action
- payload = {"clientKey": capsolver_key, "task": task}
- res = requests.post("https://api.capsolver.com/createTask", json=payload, timeout=20)
- if res.status_code != 200 or res.json().get("errorId") != 0:
- raise Exception(f"Failed to create capsolver task: {res.text}")
- task_id = res.json().get("taskId")
- self._log(f"Task created: {task_id}. Waiting for solution...")
- for _ in range(30):
- r = requests.post(
- "https://api.capsolver.com/getTaskResult",
- json={"clientKey": capsolver_key, "taskId": task_id},
- timeout=20
- )
- data = r.json()
- if data.get("status") == "ready":
- self._log("Captcha solved successfully!")
- return data["solution"].get("gRecaptchaResponse") or data["solution"].get("token")
- time.sleep(3)
- raise Exception("Capsolver task timeout")
- def login(self):
- """执行自动登录流程并提取 Group ID"""
- self.init_browser()
-
- apt_config = self.config['apt_config']
- login_url = "https://visas-fr.tlscontact.com/en-us/login"
- params = {
- "issuerId": apt_config["code"],
- "country": apt_config["country"],
- "vac": apt_config["code"],
- "redirect": f"/en-us/country/{apt_config['country']}/vac/{apt_config['code']}"
- }
- full_login_url = f"{login_url}?{urlencode(params)}"
-
- self._log(f"Navigating to login: {full_login_url}")
- self.page.get(full_login_url)
- time.sleep(3)
- wait_start = time.time()
- while True:
- # 获取页面 HTML,转小写
- # 注意:如果此处报错 "页面被刷新",是 DrissionPage 的机制问题,
- # 但你要求先不处理复杂错误,所以这里保持最简单的写法。
- html = self.page.html.lower()
-
- # 检查是否在排队室 (法语或英语)
- if "file d'attente" in html or "waiting room" in html:
- # 如果等太久(比如1小时),就强制停止
- if time.time() - wait_start > 6 * 60:
- self._log("Waiting room timeout (1h).")
- break
-
- self._log("In Waiting Room... Waiting for auto-refresh.")
- time.sleep(30) # 截图说页面会自动刷新,所以这里只sleep,不动浏览器
- else:
- # 页面里没有“等候室”的字了,说明出来了
- break
- if not self.page.ele('#email-input-field'):
- self._log("Form not found, reloading...")
- self.page.get(full_login_url)
- self.page.wait.ele_displayed('#email-input-field', timeout=15)
- self._log("Waiting 3 seconds for Captcha scripts to load...")
- time.sleep(3)
- g_token = ""
- # 判断登录页是否有 ReCaptcha
- if self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'):
- self._log("Login ReCaptcha detected, solving...")
- # 登录页通常是 V2
- g_token = self.solve_captcha(
- page_url=self.page.url,
- task_type="ReCaptchaV2TaskProxyLess",
- site_key="6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0"
- )
- account = self.config['account']
- js_login = f"""
- var u = document.getElementById('email-input-field');
- if(u) {{ u.value = "{account['username']}"; u.dispatchEvent(new Event('input', {{bubbles:true}})); }}
-
- var p = document.getElementById('password-input-field');
- if(p) {{ p.value = "{account['password']}"; p.dispatchEvent(new Event('input', {{bubbles:true}})); }}
-
- var g = document.getElementById('g-recaptcha-response');
- if(g) {{ g.value = "{g_token}"; }}
-
- var btn = document.getElementById('btn-login');
- if(btn) {{ btn.click(); return true; }} else {{ return false; }}
- """
-
- self._log("Submitting Login via JS...")
- self.page.run_js(js_login)
- self._log("Waiting for dashboard redirect...")
- self.page.wait.url_change('login-actions', exclude=True, timeout=45)
- time.sleep(4)
- if "login-actions" in self.page.url or "auth" in self.page.url:
- raise Exception("Login Failed! Invalid credentials or Captcha rejected.")
- self._log("Waiting for dashboard...")
- self.page.wait.load_start()
- time.sleep(5)
- # 解析 Dashboard 提取 Group ID
- self._log("Parsing Dashboard for Travel Group...")
- html = self.page.html
- js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups'
- js_match = re.search(js_pattern, html, re.DOTALL)
-
- groups = []
- if js_match:
- json_str = js_match.group(1).replace(r'\"', '"')
- groups = json.loads(json_str)
- target_city = apt_config['city'].lower()
- for g in groups:
- if g.get('vacName', '').lower() == target_city:
- self.travel_group = g
- break
-
- if not self.travel_group:
- raise Exception(f"Travel Group not found for city: {target_city}")
-
- formgroup_id = self.travel_group.get('formGroupId')
- self._log(f"Waiting for group button to render: {formgroup_id}")
-
- # 隐患修复:确保按钮渲染出来后再点,防止 JS 找不到元素
- btn_selector = f'tag:button@@name=formGroupId@@value={formgroup_id}'
- self.page.wait.ele_displayed(btn_selector, timeout=15)
-
- self._log(f"Select group_id={formgroup_id} via JS...")
- # 替代繁琐的 run_js,直接用内置的 by_js=True 触发
- self.page.ele(btn_selector).click(by_js=True)
-
- self._log("Waiting for service-level redirect...")
- self.page.wait.url_change('travel-groups', exclude=True, timeout=45)
- time.sleep(2) # 页面跳转后给个短缓冲
- if "travel-groups" in self.page.url or "auth" in self.page.url:
- raise Exception("Redirect to service-level Failed!")
- # ==========================================
- # 2. 点击进入 Appointment Booking
- # ==========================================
- self._log("Waiting for book-appointment button to render...")
-
- # 隐患修复:同样必须等待 Continue 按钮渲染完成
- self.page.wait.ele_displayed('#book-appointment-btn', timeout=15)
-
- self._log("Clicking 'Continue' to appointment booking via JS...")
- self.page.ele('#book-appointment-btn').click(by_js=True)
- self._log("Waiting for appointment-booking redirect...")
- self.page.wait.url_change('service-level', exclude=True, timeout=45)
- time.sleep(2)
- if "service-level" in self.page.url or "auth" in self.page.url:
- raise Exception("Redirect to appointment-booking Failed!")
- self._log("Waiting for appointment-booking page to fully load...")
- self.page.wait.load_start()
- time.sleep(3)
- self._log(f"✅ Login & Navigation Success! Target Group ID: {formgroup_id}")
- def query_slots(self) -> list:
- """根据当前 UI 状态自动判断路由,并使用高鲁棒性特征提取 Slot"""
- group_num = self.travel_group['formGroupId']
- apt_config = self.config['apt_config']
- interest_month = self.config.get("interest_month", time.strftime("%m-%Y"))
-
- # 转换工具
- target_date_obj = datetime.strptime(interest_month, "%m-%Y")
- target_month_text = target_date_obj.strftime("%B %Y")
- target_year = target_date_obj.year
- target_month_num = target_date_obj.month
-
- slots = []
- # 获取选中的月份:通过 data-testid 找到当前月份按钮并提取文本
- # 注意:这里改用 data-testid 寻找当前月份,更稳健
- current_selected_ele = self.page.ele('@data-testid=btn-current-month-available')
- current_month_text = current_selected_ele.text.strip() if current_selected_ele else ""
- is_on_target_month = (current_month_text.lower() == target_month_text.lower())
- if not is_on_target_month:
- # ==========================================
- # 模式 A: 不在目标月份 (UI 路由 + HTML 强鲁棒性解析)
- # ==========================================
- self._log(f"Current is '{current_month_text}', navigating to '{target_month_text}'...")
-
- for _ in range(12):
- target_btn_xpath = f'xpath://a[contains(@href, "month={interest_month}")]'
- target_btn = self.page.ele(target_btn_xpath)
-
- if target_btn:
- target_btn.click(by_js=True)
- time.sleep(3)
- break
-
- next_btn = self.page.ele('@data-testid=btn-next-month-available')
- if next_btn:
- next_btn.click(by_js=True)
- time.sleep(2)
- else:
- self._log("Warning: Cannot find target month or 'Next Month' button.")
- break
- self._log("Extracting slots from DOM using robust data-testid features...")
-
- # 【高鲁棒性提取】
- # 特征定义:寻找所有的“日期块”。
- # 什么是一个日期块?它是一个 div,里面直接包含一个 p 标签(放日期),
- # 且同时包含另一个 div,其内部有带有 'slot' 字样的按钮。
- day_blocks_xpath = '//div[p and div//button[contains(@data-testid, "slot")]]'
- day_blocks = self.page.eles(f'xpath:{day_blocks_xpath}')
-
- for block in day_blocks:
- # 1. 提取日期:只要是这个 block 下的 p 标签,必定是 "Mon 01" 这种
- p_ele = block.ele('tag:p')
- if not p_ele: continue
-
- # 直接从 p 标签的纯文本里抽取出数字,忽略前面的字母
- day_match = re.search(r'\d+', p_ele.text)
- if not day_match: continue
- day_str = day_match.group()
-
- full_date = f"{target_year}-{target_month_num:02d}-{int(day_str):02d}"
-
- # 2. 提取可用按钮:利用 data-testid 前缀匹配
- # 完美过滤掉 btn-unavailable-slot (灰色的不可用按钮)
- available_btns = block.eles('xpath:.//button[starts-with(@data-testid, "btn-available-slot")]')
-
- for btn in available_btns:
- # 提取时间:无视内部各种 span 的变动,只要 html 里有 00:00 这种格式就被截取
- time_match = re.search(r'\d{2}:\d{2}', btn.html)
- if not time_match: continue
- time_str = time_match.group()
-
- # 提取 Label:完全依赖测试工程师留下的 testid
- test_id = btn.attr('data-testid') or ""
- if 'prime' in test_id and 'weekend' in test_id:
- lbl = 'ptaw'
- elif 'prime' in test_id:
- lbl = 'pta'
- else:
- lbl = ''
-
- slots.append({
- 'date': full_date,
- 'time': time_str,
- 'label': lbl
- })
- else:
- # ==========================================
- # 模式 B: 已经在目标月份 (JS Fetch + 正则 JSON 解析)
- # ==========================================
- self._log(f"Already on '{target_month_text}'. Executing silent JS fetch...")
-
- base_url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking'
- params = {'location': apt_config["code"], 'month': interest_month}
- query_url = f"{base_url}?{urlencode(params)}"
-
- js_script = f"""
- return fetch("{query_url}", {{ credentials: "include" }})
- .then(async r => {{ return {{ status: r.status, body: await r.text() }}; }})
- .catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
- """
- res_dict = self.page.run_js(js_script)
- resp = BrowserResponse(res_dict)
- if resp.status_code != 200:
- raise Exception(f"Silent Query Failed: {resp.status_code}")
-
- self._log("Extracting slots from JSON response...")
- pattern = r'"availableAppointments\\":\s*(\[.*?\]),\\"showFlexiAppointment'
- match = re.search(pattern, resp.text, re.DOTALL)
-
- if match:
- json_str = match.group(1).replace(r'\"', '"')
- data = json.loads(json_str)
- for day in data:
- d_str = day.get('day')
- for s in day.get('slots', []):
- labels = s.get('labels', [])
- if not labels: continue
-
- lbl = ""
- if 'pta' in labels: lbl = 'pta'
- elif 'ptaw' in labels: lbl = 'ptaw'
- elif '' in labels: lbl = ''
-
- slots.append({
- 'date': d_str,
- 'time': s.get('time'),
- 'label': lbl
- })
- self._log(f"Found {len(slots)} valid slots.")
- return slots
- def _filter_dates(self, available_dates: list, start_str: str, end_str: str) -> list:
- if not start_str or not end_str:
- return available_dates
- valid = []
- s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
- e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
- for d in available_dates:
- curr = datetime.strptime(d, "%Y-%m-%d")
- if s_date <= curr <= e_date:
- valid.append(d)
- return valid
-
- def book(self, all_slots: list) -> bool:
- """执行预定流程"""
- if not all_slots:
- self._log("No slots provided to book.")
- return False
- # 1. 过滤日期 & 筛选标签
- target_labels = self.config.get('target_labels', [''])
- exp_start = self.config.get('expected_start_date', '')
- exp_end = self.config.get('expected_end_date', '')
- unique_dates = list(set([s['date'] for s in all_slots]))
- valid_dates = self._filter_dates(unique_dates, exp_start, exp_end)
-
- possible_slots =[
- s for s in all_slots
- if s['date'] in valid_dates and s['label'] in target_labels
- ]
- if not possible_slots:
- self._log("No slots match target dates and labels.")
- return False
- # 2. 随机选择一个 Slot
- selected = random.choice(possible_slots)
- sel_date = selected['date']
- sel_time = selected['time']
- sel_label = selected['label']
-
- self._log(f"Selected Slot -> Date: {sel_date}, Time: {sel_time}, Label: {sel_label or 'standard'}")
- group_num = self.travel_group['formGroupId']
- apt_config = self.config['apt_config']
- current_url = self.page.url
- router_state = f'%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22{group_num}%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D'
- # 3. 获取金额 (Basket Cost)
- # self._log("Fetching basket cost...")
- # cost_payload =[{"groupId": str(group_num), "lang": "en-us", "labels": [sel_label]}]
- # cost_body_json = json.dumps(cost_payload)
-
- # js_cost = f"""
- # return fetch("{current_url}", {{
- # method: 'POST',
- # headers: {{
- # 'Next-Action': '40124cc90acef520d4fd2daf60ad3c8e21fc2c11d8',
- # 'Next-Router-State-Tree': '{router_state}',
- # 'Accept': 'text/x-component',
- # 'Content-Type': 'text/plain;charset=UTF-8'
- # }},
- # body: `{cost_body_json}`
- # }}).then(async r => {{ return {{ status: r.status, body: await r.text() }}; }})
- # .catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
- # """
- # cost_res = BrowserResponse(self.page.run_js(js_cost))
- # if cost_res.status_code != 200:
- # self._log(f"Basket cost check failed: {cost_res.status_code}")
- # return False
- # 4. 解决 ReCaptcha V3
- # self._log("Solving Booking ReCaptcha V3...")
- # g_token = self.solve_captcha(
- # page_url=current_url,
- # task_type="ReCaptchaV3TaskProxyLess",
- # site_key="6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9",
- # use_proxy=False,
- # action="book",
- # api_domain="recaptcha.net"
- # )
- # 5. 注入 Hook 并点击表单
- # self._log("Injecting reCAPTCHA hook and modifying form...")
-
- # # 5.1 注入你提供的劫持 JS
- # hook_js = f"""
- # // 1. 填充可能存在的标准隐藏域
- # var input = document.getElementById('g-recaptcha-response');
- # if(input) {{
- # input.value = "{g_token}";
- # // 派发React识别的事件
- # input.dispatchEvent(new Event('input', {{ bubbles: true }}));
- # input.dispatchEvent(new Event('change', {{ bubbles: true }}));
- # }}
- # // 2. 劫持 grecaptcha.execute
- # var mockExecute = function() {{
- # console.log("[Hook] Recaptcha execution intercepted!");
- # return Promise.resolve("{g_token}");
- # }};
- # // 无论网页是否已加载完毕,保证对象存在并被劫持
- # if (!window.grecaptcha) {{
- # window.grecaptcha = {{}};
- # }}
- # window.grecaptcha.execute = mockExecute;
-
- # if (!window.grecaptcha.enterprise) {{
- # window.grecaptcha.enterprise = {{}};
- # }}
- # window.grecaptcha.enterprise.execute = mockExecute;
- # """
- # self.page.run_js(hook_js)
- # 5.2 注入表单数据并原样点击
- js_inject_and_click = f"""
- try {{
- const form = document.querySelector('form');
- if (!form) return 'Form not found';
- function setReactValue(input, value) {{
- if (!input) return;
- input.value = value;
- input.dispatchEvent(new Event('input', {{ bubbles: true }}));
- input.dispatchEvent(new Event('change', {{ bubbles: true }}));
- }}
- // 填入抢单日期、时间、标签
- setReactValue(form.querySelector('input[name="date"]'), '2026-06-01');
- setReactValue(form.querySelector('input[name="time"]'), '12:00');
- setReactValue(form.querySelector('input[name="appointmentLabel"]'), '{sel_label}');
- // 解禁并点击 Submit 按钮
- const submitBtn = form.querySelector('button[type="submit"]');
- if (submitBtn) {{
- submitBtn.removeAttribute('disabled');
- submitBtn.classList.remove('opacity-50', 'cursor-not-allowed');
- submitBtn.click();
- return 'clicked';
- }} else {{
- return 'Submit button not found';
- }}
- }} catch (e) {{
- return e.toString();
- }}
- """
-
- inject_res = self.page.run_js(js_inject_and_click)
- self._log(f"Form submission triggered: {inject_res}")
-
- if inject_res != 'clicked':
- self._log("❌ Failed to inject form or click the submit button.")
- return False
- # 6. 验证是否抢单成功 (轮询页面跳转)
- self._log("Waiting for Next.js to process the form submission...")
-
- for _ in range(15):
- time.sleep(1.0)
- current_page_url = self.page.url
-
- # Next.js 跳转进入确认页
- if "appointment-confirmation" in current_page_url:
- self._log(f"✅ BOOKING SUCCESS! Redirected to: {current_page_url}")
- return True
-
- try:
- body_text = str(self.page.run_js("return document.body.innerText || '';"))
- if "APPOINTMENT_LIMIT_REACHED" in body_text or "appointment limit" in body_text.lower():
- self._log("❌ BOOKING FAILED! Reason: Appointment Limit Reached")
- return False
- except Exception:
- pass
-
- self._log("❌ BOOKING FAILED! Timeout waiting for redirect confirmation.")
- return False
- # def book(self, all_slots: list) -> bool:
- # """执行预定流程"""
- # if not all_slots:
- # self._log("No slots provided to book.")
- # return False
- # # 1. 过滤日期 & 筛选标签
- # target_labels = self.config.get('target_labels', [''])
- # exp_start = self.config.get('expected_start_date', '')
- # exp_end = self.config.get('expected_end_date', '')
- # # 提取唯一的可用日期列表
- # unique_dates = list(set([s['date'] for s in all_slots]))
- # valid_dates = self._filter_dates(unique_dates, exp_start, exp_end)
-
- # possible_slots = [
- # s for s in all_slots
- # if s['date'] in valid_dates and s['label'] in target_labels
- # ]
- # if not possible_slots:
- # self._log("No slots match target dates and labels.")
- # return False
- # # 2. 随机选择一个 Slot
- # selected = random.choice(possible_slots)
- # sel_date = selected['date']
- # sel_time = selected['time']
- # sel_label = selected['label']
-
- # self._log(f"Selected Slot -> Date: {sel_date}, Time: {sel_time}, Label: {sel_label or 'standard'}")
- # group_num = self.travel_group['formGroupId']
- # apt_config = self.config['apt_config']
-
- # current_url = self.page.url
- # router_state = f'%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22{group_num}%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D'
- # # 3. 获取金额 (Basket Cost)
- # self._log("Fetching basket cost...")
- # cost_payload = [{"groupId": str(group_num), "lang": "en-us", "labels": [sel_label]}]
- # cost_body_json = json.dumps(cost_payload)
-
- # js_cost = f"""
- # return fetch("{current_url}", {{
- # method: 'POST',
- # headers: {{
- # 'Next-Action': '40124cc90acef520d4fd2daf60ad3c8e21fc2c11d8',
- # 'Next-Router-State-Tree': '{router_state}',
- # 'Accept': 'text/x-component',
- # 'Content-Type': 'text/plain;charset=UTF-8'
- # }},
- # body: `{cost_body_json}`
- # }}).then(async r => {{ return {{ status: r.status, body: await r.text() }}; }})
- # .catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
- # """
- # cost_res = BrowserResponse(self.page.run_js(js_cost))
- # if cost_res.status_code != 200:
- # self._log(f"Basket cost check failed: {cost_res.status_code}")
- # return False
- # # 4. 解决 ReCaptcha V3
- # self._log("Solving Booking ReCaptcha V3...")
- # g_token = self.solve_captcha(
- # page_url=current_url,
- # task_type="ReCaptchaV3M1TaskProxyLess",
- # site_key="6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9",
- # use_proxy=False,
- # action="book",
- # api_domain="recaptcha.net"
- # )
- # # 5. 提交 Booking
- # self._log("Submitting final booking request...")
- # js_book = f"""
- # const formData = new FormData();
- # formData.append('1_formGroupId', '{group_num}');
- # formData.append('1_lang', 'en-us');
- # formData.append('1_process', 'APPOINTMENT');
- # formData.append('1_location', '{apt_config["code"]}');
- # formData.append('1_date', '{sel_date}');
- # formData.append('1_time', '{sel_time}');
- # formData.append('1_appointmentLabel', '{sel_label}');
- # formData.append('1_captchaToken', '{g_token}');
- # formData.append('0', '[{{"status":"IDLE"}},"$K1"]');
-
- # return fetch("{current_url}", {{
- # method: 'POST',
- # headers: {{
- # 'Next-Action': '6043cfd107081bc817cbb11a8c0db17d3a063401be',
- # 'Next-Router-State-Tree': '{router_state}',
- # 'Accept': 'text/x-component'
- # }},
- # body: formData
- # }}).then(async r => {{
- # const hdrs = {{}};
- # r.headers.forEach((v, k) => hdrs[k] = v);
- # return {{ status: r.status, body: await r.text(), headers: hdrs, url: r.url }};
- # }}).catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
- # """
-
- # book_res_dict = self.page.run_js(js_book)
- # book_resp = BrowserResponse(book_res_dict)
-
- # # 6. 解析结果 (判定 Next.js 跳转)
- # headers_lower = {str(k).lower(): v for k, v in book_resp.headers.items()}
- # action_redirect = headers_lower.get('x-action-redirect', '')
- # is_success = (
- # book_resp.status_code == 303 or
- # (book_resp.status_code == 200 and ("appointment-confirmation" in action_redirect or "appointment-confirmation" in book_resp.url))
- # )
- # if is_success:
- # self._log(f"✅ BOOKING SUCCESS! Redirected to: {action_redirect or book_resp.url}")
- # return True
- # else:
- # self._log(f"❌ BOOKING FAILED! Status: {book_resp.status_code}")
- # if "APPOINTMENT_LIMIT_REACHED" in book_resp.text:
- # self._log("-> Reason: Appointment Limit Reached")
- # else:
- # self._log(f"-> Response Body: {book_resp.text[:300]}")
- # return False
- def cleanup(self):
- self._log("Cleaning up resources...")
- if self.page:
- try: self.page.quit()
- except: pass
- if os.path.exists(self.workspace):
- time.sleep(1)
- shutil.rmtree(self.workspace, ignore_errors=True)
- # =====================================================================
- # 运行主逻辑
- # =====================================================================
- if __name__ == "__main__":
-
- # 填写你的账号配置
- MY_CONFIG = {
- # 账号信息
- "account": {
- "username": "mayun06@gmail-app.com",
- "password": "Visafly@111"
- },
- # 目标签证中心信息 (例如广州 TLS: cnCNG2fr)
- "apt_config": {
- "country": "cn",
- "city": "Chengdu",
- "code": "cnCNG2fr"
- },
- # 代理配置
- "proxy": {
- "schema": "http",
- "ip": "127.0.0.1",
- "port": "7890",
- "username": "",
- "password": ""
- },
- # Capsolver API Key
- "capsolver_key": "CAP-5441DD341DD3CC2FAEF0BE6FE493EE9A",
-
- # 查询的月份 (格式: MM-YYYY)
- "interest_month": "06-2026",
-
- # 期望的日期范围
- "expected_start_date": "2026-06-01",
- "expected_end_date": "2026-06-30",
-
- # 目标标签: '' 是普通号, 'pta' 是 Prime 黄金时间号
- "target_labels": [""]
- }
- bot = TlsAutoBot(config=MY_CONFIG)
-
- try:
- # 1. 登录
- bot.login()
-
- # 2. 检查是否有号
- slots = bot.query_slots()
-
- if slots:
- bot._log(f"Found {len(slots)} total available slots in this month.")
- # 3. 尝试预订
- success = bot.book(slots)
- if success:
- print("\n🎉 Congratulations! Slot booked successfully!")
- else:
- print("\n⚠️ Failed to book the slot.")
- else:
- bot._log("No available slots found for the requested criteria.")
- time.sleep(3600)
- except Exception as e:
- bot._log(f"Error occurred during execution: {str(e)}")
- finally:
- bot.cleanup()
|