import time import json import os import re import uuid import socket import shutil import random import requests import argparse import concurrent.futures from urllib.parse import urlencode from datetime import datetime, timedelta from typing import Optional, Dict from DrissionPage.common import Keys from DrissionPage import ChromiumPage, ChromiumOptions import configure from utils.cloudflare_bypass_for_scraping import CloudflareBypasser from toolkit.vs_cloud_api import VSCloudApi from toolkit.mihomo_tunnel import MihomoTunnel from vs_types import NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from utils.mouse import HumanMouse from utils.keyboard import HumanKeyboard from utils.scroll import HumanScroll from utils.fingerprint_utils import FingerprintGenerator from utils.fake_utils import generate_random_account_detail def load_proxies(pool_name): """从 config/proxies.json 读取对应的代理池""" config_path = os.path.join(os.path.dirname(__file__), 'config', 'proxies.json') try: with open(config_path, 'r', encoding='utf-8') as f: data = json.load(f) proxies = data.get(pool_name, []) if not proxies: raise ValueError(f"代理池 '{pool_name}' 为空或不存在!") return proxies except Exception as e: print(f"读取代理配置文件失败: {e}") exit(1) class TlsRegistrator: def __init__(self, tls_url, proxy_config: Optional[Dict]=None, capsolver_key: Optional[str]=None, account_detail: Optional[Dict]=None): self.proxy_config = proxy_config self.capsolver_key = capsolver_key self.account_detail = account_detail # 隔离的用户数据目录 self.instance_id = uuid.uuid4().hex[:8] self.tls_url = tls_url # self.instance_id = '18d389e9' self.workspace = os.path.abspath(os.path.join("data/temp_browser_data", f"reg_session_{self.instance_id}")) self.page = None self.mouse = None self.keyboard = None # 持有隧道实例 self.tunnel = None def _log(self, msg): now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{now}][TLS-Reg-{self.instance_id}] {msg}") def _get_free_port(self): """获取可用端口,防止 DrissionPage 解析日志报错""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) return s.getsockname()[1] def save_screenshot(self, name_prefix): try: timestamp = int(time.time()) filename = f"{self.instance_id}_{name_prefix}_{timestamp}.jpg" save_path = os.path.join("data", filename) os.makedirs("data", exist_ok=True) self.page.get_screenshot(path=save_path, full_page=False) self._log(f"Screenshot saved to {save_path}") except Exception as e: self._log(f"Failed to save screenshot: {e}") def init_browser(self): """初始化独立、配置好代理的浏览器环境""" self._log("Initializing browser...") co = ChromiumOptions() # 1. 端口与路径隔离 port = self._get_free_port() co.set_local_port(port) co.set_user_data_path(self.workspace) chrome_path = configure.CHROME_PATH if not chrome_path: chrome_path = os.getenv("CHROME_BIN") if chrome_path and os.path.exists(chrome_path): co.set_paths(browser_path=chrome_path) if self.proxy_config and self.proxy_config.get("ip"): p = self.proxy_config if p.get('username') and p.get('password'): self._log(f"Starting Proxy Tunnel for {p.get('ip')}...") exit_node = { "name": "ExitNode", "type": p.get('proto'), "server": p.get('ip'), "port": p.get('port'), "username": p.get('username'), "password": p.get('password') } relay_node = None if configure.MIHOMO_RELAY_NODES: relay_node = random.choice(configure.MIHOMO_RELAY_NODES) mihomo_path = configure.MIHOMO_BIN_PATH if not mihomo_path: mihomo_path = os.getenv("MIHOMO_BIN") if not mihomo_path: raise BizLogicError(message='Mihomo path is null, You need set mihomo bin path in configure or os env') self.tunnel = MihomoTunnel(mihomo_path, exit_node=exit_node, relay_node=relay_node) local_proxy = self.tunnel.start() self._log(f"Tunnel started at {local_proxy}") co.set_argument(f'--proxy-server={local_proxy}') else: proxy_str = f"{p.get('proto')}://{p.get('ip')}:{p.get('port')}" co.set_argument(f'--proxy-server={proxy_str}') else: self._log("[WARN] No proxy configured!") fingerprint_gen = FingerprintGenerator() specific_fp = fingerprint_gen.generate(self.instance_id) self._log(f'browser fingerprint={specific_fp}') # 3. 反爬及稳定性配置 co.headless(False) co.set_argument('--no-sandbox') # co.set_argument('--disable-gpu') co.set_argument('--disable-dev-shm-usage') co.set_argument('--window-size=1920,1080') co.set_argument('--disable-blink-features=AutomationControlled') co.set_argument(f"--fingerprint={specific_fp.get('seed')}") co.set_argument(f"--fingerprint-platform={specific_fp.get('platform')}") co.set_argument(f"--fingerprint-brand={specific_fp.get('brand')}") self.page = ChromiumPage(co) self.page.get(self.tls_url) time.sleep(5) cf_bypasser = CloudflareBypasser(self.page, log=True) cf_bypasser.bypass(max_retry=8) time.sleep(3) cf_bypasser.handle_waiting_room() self._log("正在初始化拟人化工具...") self.mouse = HumanMouse(self.page, debug=False) self.keyboard = HumanKeyboard(self.page) self._log("随机化鼠标开始位置...") viewport_width = self.page.rect.viewport_size[0] viewport_height = self.page.rect.viewport_size[1] init_x = random.randint(10, viewport_width - 10) init_y = random.randint(10, viewport_height - 10) self.mouse.move(init_x, init_y) def solve_captcha(self, page_url: str, task_type: str, site_key: str, use_proxy = False, action: str = None, api_domain: str = None) -> str: """通用解决验证码 (同步 User-Agent 防止被盾识别为高风险)""" if not self.capsolver_key: raise ValueError("Capsolver API key missing") task = { "type": task_type, "websiteURL": page_url, "websiteKey": site_key, } if api_domain: task["apiDomain"] = api_domain if use_proxy: proxy = self.proxy_config task["proxyType"] = proxy.get('proto', 'http') task["proxyAddress"] = proxy.get('ip') task["proxyPort"] = int(proxy.get('port')) if proxy.get('username'): task["proxyLogin"] = proxy.get('username') task["proxyPassword"] = proxy.get('password') if action: task["pageAction"] = action payload = {"clientKey": self.capsolver_key, "task": task} res = requests.post("https://api.capsolver.com/createTask", json=payload, timeout=20) if res.status_code != 200 or res.json().get("errorId") != 0: raise Exception(f"Failed to create capsolver task: {res.text}") task_id = res.json().get("taskId") self._log(f"Task created: {task_id}. Waiting for solution...") for _ in range(30): r = requests.post( "https://api.capsolver.com/getTaskResult", json={"clientKey": self.capsolver_key, "taskId": task_id}, timeout=20 ) data = r.json() if data.get("status") == "ready": self._log("Captcha solved successfully!") return data["solution"].get("gRecaptchaResponse") or data["solution"].get("token") time.sleep(3) raise Exception("Capsolver task timeout") def register(self): """执行自动注册""" email = self.account_detail.get('email') password = self.account_detail.get('pwd') self._log(f'Account Detail:{self.account_detail}') btn_selector = '#submit' if not self.page.wait.ele_displayed(btn_selector, timeout=3): register_btn = self.page.ele("tag:a@@href:registration") self.mouse.human_click_ele(register_btn) time.sleep(3) if not self.page.wait.ele_displayed(btn_selector, timeout=10): raise BizLogicError(message=f"Can't find selector={btn_selector}") time.sleep(random.uniform(0.5, 1)) self._log("正在填写邮箱和密码...") email_input_e = self.page.ele('#email') self.mouse.human_click_ele(email_input_e) self.keyboard.type_text(email, humanize=True) time.sleep(random.uniform(0.2, 0.5)) password_e = self.page.ele('#password') self.mouse.human_click_ele(password_e) self.keyboard.type_text(password, humanize=True) time.sleep(random.uniform(0.2, 0.5)) confirm_password_e = self.page.ele('#confirm-password') self.mouse.human_click_ele(confirm_password_e) self.keyboard.type_text(password, humanize=True) time.sleep(random.uniform(0.2, 0.5)) self._log("正在勾选必选条款...") for checkbox_id in ['#terms-and-conditions', '#biometric-data', '#privacy-notice']: check_box_e = self.page.ele(checkbox_id).next() self.mouse.human_click_ele(check_box_e) time.sleep(random.uniform(0.3, 0.6)) self._log("提交注册...") btn_e = self.page.ele(btn_selector) time.sleep(random.uniform(0.3, 0.6)) self.mouse.human_click_ele(btn_e) self._log("正在等待验证结果 (最多10秒)...") success_dialog = self.page.wait.ele_displayed('tag:h1@text():Check your email inbox', timeout=10) if not success_dialog: self.page.get_screenshot("failed_submit.png") raise BizLogicError(message='Failed to submit account registration') self._log("✅ 操作成功!已弹出提示:Check your email inbox") return True def activate(self, sent_at=None): email = self.account_detail.get('email') email_box = 'visafly666@gmail.com' sender = 'TLSContact' recipient = email subject_keywords = 'TLSContact' body_keywords = '' if not sent_at: now_utc = datetime.utcnow() sent_at = now_utc.strftime("%Y-%m-%d %H:%M:%S") content_out = VSCloudApi.Instance().fetch_mail_content( email=email_box, sender=sender, recipient=recipient, subject_keywords=subject_keywords, body_keywords=body_keywords, sent_date=sent_at, expiry=600 ) self._log(f'activate email content={content_out}') match = re.search(r'https://\S+', content_out) activate_link = match.group(0) if match else None self.page.get(activate_link) btn_selector = "#activation-pending-button" if not self.page.wait.ele_displayed(btn_selector, timeout=10): raise BizLogicError(message=f"Wait ele={btn_selector} timeout") self.page.ele(btn_selector).click() time.sleep(3) def make_account_useful(self): def fill_date_field(page, selector, date_str): if not date_str: return ele = page.ele(selector) ele.scroll.to_see(center=True) js_detect_format = """ const parts = new Intl.DateTimeFormat().formatToParts(new Date(2023, 11, 31)); let format = []; for (let part of parts) { if (part.type === 'year') format.push('Y'); if (part.type === 'month') format.push('M'); if (part.type === 'day') format.push('D'); } return format; """ date_format = page.run_js(js_detect_format) year, month, day = date_str.split('-') date_dict = { 'Y': year, 'M': month.zfill(2), 'D': day.zfill(2) } ele.click() time.sleep(0.1) page.actions.type(Keys.LEFT * 3) time.sleep(0.1) for i, char in enumerate(date_format): val = date_dict[char] page.actions.type(val) time.sleep(0.1) if char == 'Y': if i < 2: page.actions.type(Keys.RIGHT) time.sleep(0.1) else: pass email = self.account_detail.get('email') password = self.account_detail.get('pwd') location = self.account_detail.get('location') btn_selector = 'tag:button@@text():Login' if not self.page.wait.ele_displayed(btn_selector, timeout=3): login_btn = self.page.ele("tag:a@@href:login") self.mouse.human_click_ele(login_btn) time.sleep(3) if not self.page.wait.ele_displayed(btn_selector, timeout=10): raise BizLogicError(message=f"Can't find selector={btn_selector}") recpatchav2_token = "" if self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'): self._log("Solving ReCaptcha...") recpatchav2_token = self.solve_captcha(self.page.url, "ReCaptchaV2TaskProxyLess", "6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0") input_ele = self.page.ele('tag:label@@text():Email').next() self.mouse.human_click_ele(input_ele) time.sleep(random.uniform(0.2, 0.6)) self.keyboard.type_text(email, humanize=True) time.sleep(random.uniform(0.5, 1.2)) input_ele = self.page.ele('tag:label@@text():Password').next() self.mouse.human_click_ele(input_ele) time.sleep(random.uniform(0.2, 0.6)) self.keyboard.type_text(password, humanize=True) if recpatchav2_token: inject_recpatchav2_token_js = f""" var g = document.getElementById('g-recaptcha-response'); if(g) {{ g.value = "{recpatchav2_token}"; }} """ self._log("Inject ReCaptchaV2 Token via JS...") self.page.run_js(inject_recpatchav2_token_js) time.sleep(random.uniform(0.5, 1.0)) self._log("Submitting Login...") time.sleep(random.uniform(0.3, 0.8)) login_btn = self.page.ele('tag:button@@text():Login') self.mouse.human_click_ele(login_btn) self._log("Waiting for dashboard redirect...") self.page.wait.url_change('login-actions', exclude=True, timeout=45) time.sleep(4) if "login-actions" in self.page.url or "auth" in self.page.url: raise BizLogicError(message="Login Failed! Invalid credentials or Captcha rejected.") self._log("Waiting for dashboard...") self.page.wait.load_start() time.sleep(5) # 解析 Dashboard 提取 Group ID self._log("Parsing Dashboard for Travel Group...") html = self.page.html js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups' js_match = re.search(js_pattern, html, re.DOTALL) groups = [] if js_match: json_str = js_match.group(1).replace(r'\"', '"') groups = json.loads(json_str) travel_group = None for g in groups: if g.get('vacName', '').lower() == location.lower(): travel_group = g break if not travel_group: raise BizLogicError(message=f"Travel Group not found for {location}") formgroup_id = travel_group.get('formGroupId') self._log(f"Waiting for group button to render: {formgroup_id}") btn_selector = f'tag:a@@data-testid=btn-select-group' self._log(f"Select group_id={formgroup_id}...") self.mouse.human_click_ele(self.page.ele(btn_selector)) self._log("Waiting for url redirect...") self.page.wait.url_change('travel-groups', exclude=True, timeout=45) time.sleep(2) if "travel-groups" in self.page.url or "auth" in self.page.url: raise BizLogicError(message="Redirect to service-level Failed!") btn_selector_add = 'tag:button@@data-testid=btn-add-applicant' btn_selector_max = 'tag:button@@data-testid=btn-max-number-of-applicants' error_selector = 'tag:h2@text():Something went wrong' for attempt in range(2): try: btn_selector = btn_selector_add if not self.page.wait.ele_displayed(btn_selector, timeout=10): btn_selector = btn_selector_max if not self.page.ele(btn_selector): raise BizLogicError(message=f"Can't find selector={btn_selector}") add_btn = self.page.ele(btn_selector) add_btn.scroll.to_see(center=True) time.sleep(random.uniform(0.6, 0.8)) add_btn.click() if self.page.wait.ele_displayed(error_selector, timeout=5): raise BizLogicError("Page shows 'Something went wrong'") break except Exception as e: if attempt == 0: self.page.refresh() time.sleep(5) else: raise BizLogicError(message=f"Click add applicant failed after retry: {e}") visa_type = self.account_detail.get("visa_type") if visa_type: btn = self.page.ele('tag:button@@data-testid=input-visa-type') btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:span@@text():{visa_type}').click(by_js=True) time.sleep(0.5) tavel_purpose = self.account_detail.get("travel_purpose") if tavel_purpose: btn = self.page.ele('tag:button@@data-testid=input-travel-purpose') btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:span@@text():{tavel_purpose}').click(by_js=True) time.sleep(0.5) application_form_id = self.account_detail.get('application_form_id') if application_form_id: ele = self.page.ele('tag:input@@data-testid=f_cai') ele.scroll.to_see(center=True) ele.input(application_form_id) last_name = self.account_detail.get('last_name') if last_name: ele = self.page.ele('tag:input@@data-testid=f_pers_surnames') ele.scroll.to_see(center=True) ele.input(last_name.upper()) first_name = self.account_detail.get('first_name') if first_name: ele = self.page.ele('tag:input@@data-testid=f_pers_givennames') ele.scroll.to_see(center=True) ele.input(first_name.upper()) gender = self.account_detail.get('gender') if gender: try: gender = gender.capitalize() ele = self.page.ele(f'tag:label@@text():{gender}') ele.scroll.to_see(center=True) ele.click() except Exception as e: self._log(e) fill_date_field(self.page, 'tag:input@@data-testid=f_pers_birth_date', self.account_detail.get('birthday')) nationality = self.account_detail.get('nationality') if nationality: nationality = nationality.title() btn = self.page.ele('tag:label@@for=f_pers_nationality').next() btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:li@@role=option@@text():{nationality}').click(by_js=True) time.sleep(0.5) province_residence = self.account_detail.get('province_residence') if province_residence: try: province_residence = province_residence.title() btn = self.page.ele('tag:label@@for=f_pers_province').next() btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:li@@role=option@@text():{province_residence}').click(by_js=True) time.sleep(0.5) except Exception as e: self._log(e) passport_type = self.account_detail.get('passport_type') if passport_type: btn = self.page.ele('tag:label@@for=f_identity_type').next() btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:li@@role=option@@text():{passport_type}').click(by_js=True) time.sleep(0.5) passport_no = self.account_detail.get('passport_no') if passport_no: ele = self.page.ele('tag:input@@data-testid=f_pass_num') ele.scroll.to_see(center=True) ele.input(passport_no.upper()) passport_issue_date = self.account_detail.get('passport_issue_date') if passport_issue_date: try: fill_date_field(self.page, 'tag:input@@data-testid=fi_passport_issue_date', passport_issue_date) time.sleep(0.5) except Exception as e: self._log(e) passport_expiry_date = self.account_detail.get('passport_expiry_date') if passport_expiry_date: try: fill_date_field(self.page, 'tag:input@@data-testid=fi_passport_expiry_date', passport_expiry_date) time.sleep(0.5) except Exception as e: self._log(e) phone_country_code = self.account_detail.get('phone_country_code') phone_number = self.account_detail.get('phone_number') if phone_country_code: div = self.page.ele('tag:label@@for=f_pers_mobile_phone').next() btn = div.ele('tag:button') btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:li@@role=option@@text():+{phone_country_code}').click(by_js=True) time.sleep(0.5) div.ele('tag:input@@type:tel').input(phone_number) # 使用封装好的日期输入函数 fill_date_field(self.page, 'tag:input@@data-testid=fi_trav_origin_departure_date', self.account_detail.get('departure_origin_date')) fill_date_field(self.page, 'tag:input@@data-testid=f_trav_departure_date', self.account_detail.get('arrival_schengen_area_date')) fill_date_field(self.page, 'tag:input@@data-testid=f_trav_arrival_date', self.account_detail.get('departure_schengen_area_date')) submit_btn = self.page.ele('tag:button@@data-testid=btn-submit') submit_btn.scroll.to_see(center=True) time.sleep(1) submit_btn.click() time.sleep(6) submit_btn = self.page.ele('tag:button@@text():Confirm') submit_btn.scroll.to_see(center=True) submit_btn.click() time.sleep(6) def upload_account_to_server(self): """ 将注册成功的账号上报到中心服务器 """ api_url = 'https://visafly.top/api/account/add' api_token = 'tok_e946329a60ff45ba807f3f41b0e8b7fc' # 你的 Bearer Token # 构造请求头 headers = { 'accept': 'application/json', 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } # 构造主 Payload payload = { "pool_name": self.account_detail.get("pool_name", "default_pool"), "username": self.account_detail.get("email"), "password": self.account_detail.get("pwd"), "extra_data": self.account_detail } try: (f"Uploading account {self.account_detail['email']} to server...") resp = requests.post(api_url, json=payload, headers=headers, timeout=10) if resp.status_code == 200: self._log(f"✅ [API Upload Success] Server responded: {resp.text}") return True else: self._log(f"❌ [API Upload Failed] Status: {resp.status_code}, Body: {resp.text}") return False except Exception as e: self._log(f"❌ [API Upload Error]: {e}") return False def cleanup(self): """清理浏览器进程和缓存文件夹""" self._log("Cleaning up resources...") if self.page: try: self.page.quit() except: pass if os.path.exists(self.workspace): time.sleep(1) # 等待文件锁释放 shutil.rmtree(self.workspace, ignore_errors=True) def register_worker(proxy_config, tls_url, capsolver_key): """单个注册任务的工作线程函数""" account_detail = generate_random_account_detail('CN') bot = None try: bot = TlsRegistrator( tls_url, proxy_config=proxy_config, capsolver_key=capsolver_key, account_detail=account_detail ) bot.init_browser() # ⚠️ 记得在 Docker 中必须是 headless 无头模式 now_utc = datetime.utcnow() sent_at = now_utc.strftime("%Y-%m-%d %H:%M:%S") bot.register() bot.activate(sent_at=sent_at) bot.make_account_useful() bot.upload_account_to_server() bot.save_screenshot(f'success_{account_detail.get("email")}') print(f"[SUCCESS] 账号 {account_detail.get('email')} 注册成功! 使用代理: {proxy_config.get('ip')}") return True except Exception as e: print(f"[ERROR] 注册失败 | 代理 IP: {proxy_config.get('ip')} | 异常信息: {e}") bot.save_screenshot('tls_registration_failed') return False finally: if bot: try: bot.cleanup() except: pass def main(): # ================= 命令行参数解析 ================= parser = argparse.ArgumentParser(description="TLS 批量注册机") parser.add_argument("-n", "--concurrency", type=int, default=1, help="最大并发数 (N)") parser.add_argument("-m", "--target", type=int, default=1, help="最大成功注册数 (M)") parser.add_argument("-p", "--pool", type=str, default="local", help="代理池名称") parser.add_argument("-u", "--url", type=str, default="https://visas-fr.tlscontact.com/en-us/country/gb/vac/gbLON2fr", help="TLS 目标网址") args = parser.parse_args() # ================= 环境变量读取 ================= capsolver_key = os.getenv("CAPSOLVER_KEY") if not capsolver_key: capsolver_key = "CAP-5441DD341DD3CC2FAEF0BE6FE493EE9A" print(f"[*] 启动注册任务 | 目标数: {args.target} | 并发数: {args.concurrency} | 代理池: {args.pool} | URL: {args.url}") proxies = load_proxies(args.pool) print(f"[*] 成功加载代理数量: {len(proxies)} 个") success_count = 0 active_tasks = 0 # 使用线程池维持并发 with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as executor: futures = {} # 1. 初始填充任务队列 while active_tasks < args.concurrency and (success_count + active_tasks) < args.target: proxy = random.choice(proxies) fut = executor.submit(register_worker, proxy, args.url, capsolver_key) futures[fut] = proxy active_tasks += 1 # 2. 调度循环 while futures: done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) for fut in done: proxy = futures.pop(fut) active_tasks -= 1 try: if fut.result(): success_count += 1 print(f"[*] 进度更新: {success_count} / {args.target}") except Exception as e: print(f"[FATAL] 线程未捕获异常: {e}") # 如果还没达到目标,补入新任务 if (success_count + active_tasks) < args.target: new_proxy = random.choice(proxies) new_fut = executor.submit(register_worker, new_proxy, args.url, capsolver_key) futures[new_fut] = new_proxy active_tasks += 1 print(f"[*] 任务结束!共成功注册 {success_count} 个账号。") if __name__ == "__main__": main()