| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694 |
- import time
- import json
- import random
- import re
- import os
- import uuid
- import socket
- import shutil
- import requests
- import threading
- import select
- import base64
- from datetime import datetime
- from urllib.parse import urlencode
- # DrissionPage 核心
- from DrissionPage import ChromiumPage, ChromiumOptions
- class ProxyTunnel:
- """
- 【修复优化版】管理本地代理隧道
- 1. 启用 TCP_NODELAY 消除握手延迟 (关键修复)
- 2. 开启 KeepAlive 防止链路中断
- 3. 修复非阻塞模式下 sendall 导致的数据丢失问题
- """
- def __init__(self, upstream_ip, upstream_port, username, password):
- self.upstream_ip = upstream_ip
- self.upstream_port = int(upstream_port)
- self.username = username
- self.password = password
-
- # 预先计算 Proxy-Authorization 头
- auth_str = f"{username}:{password}"
- b64_auth = base64.b64encode(auth_str.encode()).decode()
- self.auth_header = f"Proxy-Authorization: Basic {b64_auth}\r\n"
-
- self.server_socket = None
- self.local_port = 0
- self.running = False
- self.listen_thread = None
- def start(self):
- try:
- self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- self.server_socket.bind(('127.0.0.1', 0))
- self.local_port = self.server_socket.getsockname()[1]
- self.server_socket.listen(128) # 增加连接队列长度
-
- self.running = True
-
- self.listen_thread = threading.Thread(target=self._accept_loop, daemon=True)
- self.listen_thread.start()
-
- return f"127.0.0.1:{self.local_port}"
- except Exception as e:
- self.stop()
- raise RuntimeError(f"Failed to start tunnel: {e}")
- def stop(self):
- self.running = False
- if self.server_socket:
- try:
- self.server_socket.close()
- except Exception:
- pass
- self.server_socket = None
- def _accept_loop(self):
- while self.running:
- try:
- if self.server_socket:
- # 使用 select 替代 settimeout,减少 CPU 空转
- r, _, _ = select.select([self.server_socket], [], [], 1.0)
- if r:
- try:
- client_sock, _ = self.server_socket.accept()
- t = threading.Thread(target=self._handle_client, args=(client_sock,), daemon=True)
- t.start()
- except OSError:
- break
- except Exception:
- continue
- def _optimize_socket(self, sock):
- """核心优化:设置 Socket 选项"""
- try:
- # 1. 禁用 Nagle 算法:数据包立即发送,不等待填满缓冲区
- # 这是解决 "HttpClient Timeout" 的关键
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-
- # 2. 开启 KeepAlive:防止防火墙切断空闲连接
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-
- # 3. 增大缓冲区 (Linux/Mac 可选,Windows 一般自动管理)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 32*1024)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32*1024)
- except Exception:
- pass
- def _handle_client(self, client_sock):
- upstream_sock = None
- try:
- self._optimize_socket(client_sock)
- client_sock.settimeout(30)
-
- # 1. 读取首包 (32KB 缓冲区)
- try:
- first_packet = client_sock.recv(32768)
- except socket.timeout:
- return # 客户端连上但不发数据
-
- if not first_packet:
- return
- # 2. 连接上游
- upstream_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._optimize_socket(upstream_sock) # 同样优化上游 Socket
-
- upstream_sock.settimeout(10) # 连接超时 10s
- upstream_sock.connect((self.upstream_ip, self.upstream_port))
-
- # 连接建立后,将超时设为 None (阻塞模式),交由 select 控制
- upstream_sock.settimeout(None)
- client_sock.settimeout(None)
-
- # 3. 注入 Header
- sep = b'\r\n'
- idx = first_packet.find(sep)
- if idx != -1:
- new_packet = first_packet[:idx+2] + self.auth_header.encode() + first_packet[idx+2:]
- else:
- new_packet = first_packet
- # 4. 发送首包 (阻塞式发送,保证数据完整)
- upstream_sock.sendall(new_packet)
-
- # 5. 双向转发
- self._pipe_sockets(client_sock, upstream_sock)
- except Exception:
- pass
- finally:
- self._close_socket(client_sock)
- self._close_socket(upstream_sock)
- def _pipe_sockets(self, sock1, sock2):
- """
- 修复后的转发逻辑:
- 保持 Socket 为阻塞模式,利用 select 监听可读状态。
- """
- sockets = [sock1, sock2]
- last_activity = time.time()
- IDLE_TIMEOUT = 120 # 延长空闲超时
-
- while self.running:
- try:
- # 监听可读事件
- r, _, x = select.select(sockets, [], sockets, 1.0)
-
- if x: break # Socket 异常
-
- if not r:
- if time.time() - last_activity > IDLE_TIMEOUT:
- break
- continue
-
- for s in r:
- try:
- # 尝试读取
- data = s.recv(32768)
- except ConnectionResetError:
- data = None
-
- if not data:
- return # 连接关闭
-
- # 确定发送目标
- target = sock2 if s is sock1 else sock1
-
- # 关键修改:使用阻塞式 sendall
- # 如果网络卡顿,线程会在这里暂停等待,而不是抛出错误或丢包
- try:
- target.sendall(data)
- except BrokenPipeError:
- return
-
- last_activity = time.time()
-
- except Exception:
- break
- def _close_socket(self, sock):
- """优雅关闭 Socket"""
- if sock:
- try:
- # 发送 FIN 包,通知对端数据发送完毕
- sock.shutdown(socket.SHUT_RDWR)
- except Exception:
- pass
- try:
- sock.close()
- except Exception:
- pass
- def __del__(self):
- self.stop()
- class BrowserResponse:
- """模拟 requests.Response 的轻量级对象"""
- def __init__(self, result_dict):
- result_dict = result_dict or {}
- self.status_code = result_dict.get('status', 0)
- self.text = result_dict.get('body', '')
- self.headers = result_dict.get('headers', {})
- self.url = result_dict.get('url', '')
- class TlsAutoBot:
- def __init__(self, config: dict):
- """
- config 包含: proxy, account, capsolver_key, apt_config (code, country, city), target_dates 等
- """
- self.config = config
- self.instance_id = uuid.uuid4().hex[:8]
- self.workspace = os.path.abspath(os.path.join("data", f"tls_session_{self.instance_id}"))
- self.page = None
- self.travel_group = None
- self.tunnel = None
- def _log(self, msg):
- print(f"[TLS-Bot-{self.instance_id}] {msg}")
- def _get_free_port(self):
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('', 0))
- return s.getsockname()[1]
- def init_browser(self):
- self._log("Initializing browser...")
- co = ChromiumOptions()
-
- # 1. 端口与路径隔离
- port = self._get_free_port()
- co.set_local_port(port)
- co.set_user_data_path(self.workspace)
-
- # 2. 代理配置
- proxy_cfg = self.config.get('proxy', {})
-
- if proxy_cfg.get("username") and proxy_cfg.get("password"):
- self._log(f"Starting Proxy Tunnel for {proxy_cfg.get('ip')}...")
-
- # 1. 启动本地隧道
- self.tunnel = ProxyTunnel(proxy_cfg.get('ip'), proxy_cfg.get('port'), proxy_cfg.get('username'), proxy_cfg.get('password'))
- local_proxy = self.tunnel.start()
-
- self._log(f"Tunnel started at {local_proxy}")
-
- # 2. Chrome 连接本地免密端口
- # 必须使用 --proxy-server 强制指定,绝对稳健
- co.set_argument(f'--proxy-server={local_proxy}')
-
- else:
- # 无密码代理,直接用
- proxy_str = f"{proxy_cfg.get('schema')}://{proxy_cfg.get('ip')}:{proxy_cfg.get('port')}"
- co.set_argument(f'--proxy-server={proxy_str}')
- # 3. 反爬配置
- co.headless(False)
- co.set_argument('--no-sandbox')
- co.set_argument('--disable-gpu')
- co.set_argument('--disable-dev-shm-usage')
- co.set_argument('--window-size=1920,1080')
- co.set_argument('--disable-blink-features=AutomationControlled')
- self.page = ChromiumPage(co)
- def solve_captcha(self, page_url: str, task_type: str, site_key: str, use_proxy = False, action: str = None) -> str:
- """通用解决验证码 (同步 User-Agent 防止被盾识别为高风险)"""
- capsolver_key = self.config.get('capsolver_key')
- if not capsolver_key:
- raise ValueError("Capsolver API key missing")
- task = {
- "type": task_type,
- "websiteURL": page_url,
- "websiteKey": site_key,
- }
- if use_proxy:
- proxy = self.config['proxy']
- task["proxyType"] = proxy.get('scheme', 'http')
- task["proxyAddress"] = proxy.get('ip')
- task["proxyPort"] = int(proxy.get('port'))
- if proxy.get('username'):
- task["proxyLogin"] = proxy.get('username')
- task["proxyPassword"] = proxy.get('password')
- if action:
- task["pageAction"] = action
- payload = {"clientKey": capsolver_key, "task": task}
- res = requests.post("https://api.capsolver.com/createTask", json=payload, timeout=20)
- if res.status_code != 200 or res.json().get("errorId") != 0:
- raise Exception(f"Failed to create capsolver task: {res.text}")
- task_id = res.json().get("taskId")
- self._log(f"Task created: {task_id}. Waiting for solution...")
- for _ in range(30):
- r = requests.post(
- "https://api.capsolver.com/getTaskResult",
- json={"clientKey": capsolver_key, "taskId": task_id},
- timeout=20
- )
- data = r.json()
- if data.get("status") == "ready":
- self._log("Captcha solved successfully!")
- return data["solution"].get("gRecaptchaResponse") or data["solution"].get("token")
- time.sleep(3)
- raise Exception("Capsolver task timeout")
- def login(self):
- """执行自动登录流程并提取 Group ID"""
- self.init_browser()
-
- apt_config = self.config['apt_config']
- login_url = "https://visas-fr.tlscontact.com/en-us/login"
- params = {
- "issuerId": apt_config["code"],
- "country": apt_config["country"],
- "vac": apt_config["code"],
- "redirect": f"/en-us/country/{apt_config['country']}/vac/{apt_config['code']}"
- }
- full_login_url = f"{login_url}?{urlencode(params)}"
-
- self._log(f"Navigating to login: {full_login_url}")
- self.page.get(full_login_url)
- time.sleep(3)
- wait_start = time.time()
- while True:
- # 获取页面 HTML,转小写
- # 注意:如果此处报错 "页面被刷新",是 DrissionPage 的机制问题,
- # 但你要求先不处理复杂错误,所以这里保持最简单的写法。
- html = self.page.html.lower()
-
- # 检查是否在排队室 (法语或英语)
- if "file d'attente" in html or "waiting room" in html:
- # 如果等太久(比如1小时),就强制停止
- if time.time() - wait_start > 6 * 60:
- self._log("Waiting room timeout (1h).")
- break
-
- self._log("In Waiting Room... Waiting for auto-refresh.")
- time.sleep(30) # 截图说页面会自动刷新,所以这里只sleep,不动浏览器
- else:
- # 页面里没有“等候室”的字了,说明出来了
- break
- if not self.page.ele('#email-input-field'):
- self._log("Form not found, reloading...")
- self.page.get(full_login_url)
- self.page.wait.ele_displayed('#email-input-field', timeout=15)
- self._log("Waiting 3 seconds for Captcha scripts to load...")
- time.sleep(3)
- g_token = ""
- # 判断登录页是否有 ReCaptcha
- if self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'):
- self._log("Login ReCaptcha detected, solving...")
- # 登录页通常是 V2
- g_token = self.solve_captcha(
- page_url=self.page.url,
- task_type="ReCaptchaV2TaskProxyLess",
- site_key="6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0"
- )
- account = self.config['account']
- js_login = f"""
- var u = document.getElementById('email-input-field');
- if(u) {{ u.value = "{account['username']}"; u.dispatchEvent(new Event('input', {{bubbles:true}})); }}
-
- var p = document.getElementById('password-input-field');
- if(p) {{ p.value = "{account['password']}"; p.dispatchEvent(new Event('input', {{bubbles:true}})); }}
-
- var g = document.getElementById('g-recaptcha-response');
- if(g) {{ g.value = "{g_token}"; }}
-
- var btn = document.getElementById('btn-login');
- if(btn) {{ btn.click(); return true; }} else {{ return false; }}
- """
-
- self._log("Submitting Login via JS...")
- self.page.run_js(js_login)
- self._log("Waiting for dashboard redirect...")
- self.page.wait.url_change('login-actions', exclude=True, timeout=45)
- time.sleep(4)
- if "login-actions" in self.page.url or "auth" in self.page.url:
- raise Exception("Login Failed! Invalid credentials or Captcha rejected.")
- self._log("Waiting for dashboard...")
- self.page.wait.load_start()
- time.sleep(5)
- # 解析 Dashboard 提取 Group ID
- self._log("Parsing Dashboard for Travel Group...")
- html = self.page.html
- js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups'
- js_match = re.search(js_pattern, html, re.DOTALL)
-
- groups = []
- if js_match:
- json_str = js_match.group(1).replace(r'\"', '"')
- groups = json.loads(json_str)
- target_city = apt_config['city'].lower()
- for g in groups:
- if g.get('vacName', '').lower() == target_city:
- self.travel_group = g
- break
-
- if not self.travel_group:
- raise Exception(f"Travel Group not found for city: {target_city}")
- self._log(f"Login Success! Target Group ID: {self.travel_group['formGroupId']}")
- def query_slots(self) -> list:
- """通过 JS Fetch 获取可用日期并解析"""
- self._log("Querying available slots...")
- group_num = self.travel_group['formGroupId']
- apt_config = self.config['apt_config']
- interest_month = self.config.get("interest_month", time.strftime("%m-%Y"))
-
- url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking'
- params = {'location': apt_config["code"], 'month': interest_month}
-
- # 组装完整的 query url
- query_url = f"{url}?{urlencode(params)}"
-
- js_script = f"""
- return fetch("{query_url}", {{ credentials: "include" }})
- .then(async r => {{ return {{ status: r.status, body: await r.text() }}; }})
- .catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
- """
- res_dict = self.page.run_js(js_script)
- resp = BrowserResponse(res_dict)
- if resp.status_code != 200:
- raise Exception(f"Query Failed: {resp.status_code} - {resp.text[:100]}")
- # 解析正则 (保持原样)
- slots = []
- pattern = r'"availableAppointments\\":\s*(\[.*?\]),\\"showFlexiAppointment'
- match = re.search(pattern, resp.text, re.DOTALL)
- if match:
- json_str = match.group(1).replace(r'\"', '"')
- data = json.loads(json_str)
- for day in data:
- d_str = day.get('day')
- for s in day.get('slots', []):
- labels = s.get('labels', [])
- if not labels:
- continue # 空数组说明 unavailable
-
- lbl = ""
- if 'pta' in labels: lbl = 'pta'
- elif 'ptaw' in labels: lbl = 'ptaw'
- elif '' in labels: lbl = ''
-
- slots.append({
- 'date': d_str,
- 'time': s.get('time'),
- 'label': lbl
- })
- return slots
- def _filter_dates(self, available_dates: list, start_str: str, end_str: str) -> list:
- if not start_str or not end_str:
- return available_dates
- valid = []
- s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
- e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
- for d in available_dates:
- curr = datetime.strptime(d, "%Y-%m-%d")
- if s_date <= curr <= e_date:
- valid.append(d)
- return valid
- def book(self, all_slots: list) -> bool:
- """执行预定流程"""
- if not all_slots:
- self._log("No slots provided to book.")
- return False
- # 1. 过滤日期 & 筛选标签
- target_labels = self.config.get('target_labels', [''])
- exp_start = self.config.get('expected_start_date', '')
- exp_end = self.config.get('expected_end_date', '')
- # 提取唯一的可用日期列表
- unique_dates = list(set([s['date'] for s in all_slots]))
- valid_dates = self._filter_dates(unique_dates, exp_start, exp_end)
-
- possible_slots = [
- s for s in all_slots
- if s['date'] in valid_dates and s['label'] in target_labels
- ]
- if not possible_slots:
- self._log("No slots match target dates and labels.")
- return False
- # 2. 随机选择一个 Slot
- selected = random.choice(possible_slots)
- sel_date = selected['date']
- sel_time = selected['time']
- sel_label = selected['label']
-
- self._log(f"Selected Slot -> Date: {sel_date}, Time: {sel_time}, Label: {sel_label or 'standard'}")
- group_num = self.travel_group['formGroupId']
- apt_config = self.config['apt_config']
-
- base_url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking'
- # [关键修复] Next.js Action 必须带上正确的 Query 参数
- year, month, day = sel_date.split('-')
- formatted_month = f"{month}-{year}"
-
- full_url = f'{base_url}?location={apt_config["code"]}&month={formatted_month}'
-
- router_state = f'%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22{group_num}%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D'
- # 3. 获取金额 (Basket Cost)
- self._log("Fetching basket cost...")
- cost_payload = [{"groupId": str(group_num), "lang": "en-us", "labels": [sel_label]}]
- cost_body_json = json.dumps(cost_payload)
-
- js_cost = f"""
- return fetch("{full_url}", {{
- method: 'POST',
- headers: {{
- 'Next-Action': '40124cc90acef520d4fd2daf60ad3c8e21fc2c11d8',
- 'Next-Router-State-Tree': '{router_state}',
- 'Accept': 'text/x-component',
- 'Content-Type': 'text/plain;charset=UTF-8'
- }},
- body: `{cost_body_json}`
- }}).then(async r => {{ return {{ status: r.status, body: await r.text() }}; }})
- .catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
- """
- cost_res = BrowserResponse(self.page.run_js(js_cost))
- if cost_res.status_code != 200:
- self._log(f"Basket cost check failed: {cost_res.status_code}")
- return False
- # 4. 解决 ReCaptcha V3
- self._log("Solving Booking ReCaptcha V3...")
- g_token = self.solve_captcha(
- page_url=full_url,
- task_type="ReCaptchaV3Task",
- site_key="6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9",
- use_proxy=True,
- action="book"
- )
- # 5. 提交 Booking
- self._log("Submitting final booking request...")
- js_book = f"""
- const formData = new FormData();
- formData.append('1_formGroupId', '{group_num}');
- formData.append('1_lang', 'en-us');
- formData.append('1_process', 'APPOINTMENT');
- formData.append('1_location', '{apt_config["code"]}');
- formData.append('1_date', '{sel_date}');
- formData.append('1_time', '{sel_time}');
- formData.append('1_appointmentLabel', '{sel_label}');
- formData.append('1_captchaToken', '{g_token}');
- formData.append('0', '[{{"status":"IDLE"}},"$K1"]');
-
- return fetch("{full_url}", {{
- method: 'POST',
- headers: {{
- 'Next-Action': '6043cfd107081bc817cbb11a8c0db17d3a063401be',
- 'Next-Router-State-Tree': '{router_state}',
- 'Accept': 'text/x-component'
- }},
- body: formData
- }}).then(async r => {{
- const hdrs = {{}};
- r.headers.forEach((v, k) => hdrs[k] = v);
- return {{ status: r.status, body: await r.text(), headers: hdrs, url: r.url }};
- }}).catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
- """
-
- book_res_dict = self.page.run_js(js_book)
- book_resp = BrowserResponse(book_res_dict)
-
- # 6. 解析结果 (判定 Next.js 跳转)
- headers_lower = {str(k).lower(): v for k, v in book_resp.headers.items()}
- action_redirect = headers_lower.get('x-action-redirect', '')
- is_success = (
- book_resp.status_code == 303 or
- (book_resp.status_code == 200 and ("appointment-confirmation" in action_redirect or "appointment-confirmation" in book_resp.url))
- )
- if is_success:
- self._log(f"✅ BOOKING SUCCESS! Redirected to: {action_redirect or book_resp.url}")
- return True
- else:
- self._log(f"❌ BOOKING FAILED! Status: {book_resp.status_code}")
- if "APPOINTMENT_LIMIT_REACHED" in book_resp.text:
- self._log("-> Reason: Appointment Limit Reached")
- else:
- self._log(f"-> Response Body: {book_resp.text[:300]}")
- return False
- def cleanup(self):
- self._log("Cleaning up resources...")
- if self.page:
- try: self.page.quit()
- except: pass
- if os.path.exists(self.workspace):
- time.sleep(1)
- shutil.rmtree(self.workspace, ignore_errors=True)
- # =====================================================================
- # 运行主逻辑
- # =====================================================================
- if __name__ == "__main__":
-
- # 填写你的账号配置
- MY_CONFIG = {
- # 账号信息
- "account": {
- "username": "zhangsan06@gmail-app.com",
- "password": "Visafly@111"
- },
- # 目标签证中心信息 (例如广州 TLS: cnCNG2fr)
- "apt_config": {
- "country": "cn",
- "city": "Chengdu",
- "code": "cnCNG2fr"
- },
- # 代理配置
- "proxy": {
- "scheme": "http",
- "ip": "95.135.130.10",
- "port": "46107",
- "username": "Iz1WuKKwt1KUzEe",
- "password": "G7syngmdyGURblY"
- },
- # Capsolver API Key
- "capsolver_key": "CAP-5441DD341DD3CC2FAEF0BE6FE493EE9A",
-
- # 查询的月份 (格式: MM-YYYY)
- "interest_month": "06-2026",
-
- # 期望的日期范围
- "expected_start_date": "2026-06-01",
- "expected_end_date": "2026-06-30",
-
- # 目标标签: '' 是普通号, 'pta' 是 Prime 黄金时间号
- "target_labels": ["", "pta"]
- }
- bot = TlsAutoBot(config=MY_CONFIG)
-
- try:
- # 1. 登录
- bot.login()
-
- # 2. 检查是否有号
- slots = bot.query_slots()
-
- if slots:
- bot._log(f"Found {len(slots)} total available slots in this month.")
- # 3. 尝试预订
- success = bot.book(slots)
- if success:
- print("\n🎉 Congratulations! Slot booked successfully!")
- else:
- print("\n⚠️ Failed to book the slot.")
- else:
- bot._log("No available slots found for the requested criteria.")
- time.sleep(3600)
- except Exception as e:
- bot._log(f"Error occurred during execution: {str(e)}")
- finally:
- bot.cleanup()
|