import time import json import os import re import uuid import socket import shutil import random import requests from urllib.parse import urlencode from datetime import datetime, timedelta from typing import Optional, Dict from DrissionPage.common import Keys from DrissionPage import ChromiumPage, ChromiumOptions from utils.cloudflare_bypass_for_scraping import CloudflareBypasser from toolkit.vs_cloud_api import VSCloudApi from vs_types import NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from utils.mouse import HumanMouse from utils.keyboard import HumanKeyboard from utils.scroll import HumanScroll def generate_random_account_detail() -> Dict: """基于 randomuser 生成随机账户信息,并保留指定固定字段。""" base_date = datetime.utcnow().date() + timedelta(days=random.randint(20, 90)) arrival_schengen_area_date = base_date.strftime("%Y-%m-%d") departure_origin_date = (base_date - timedelta(days=random.randint(0, 2))).strftime("%Y-%m-%d") departure_schengen_area_date = (base_date + timedelta(days=random.randint(2, 15))).strftime("%Y-%m-%d") default_payload = { "email": f"user{random.randint(100000, 999999)}@gmail-app.com", "pwd": "Visafly@111", "location": "London", "visa_type": "Short stay (<90 days) - Tourism", "travel_purpose": "Tourism / Private visit", "application_form_id": "FRA" + "".join(str(random.randint(0, 9)) for _ in range(14)), "last_name": "Smith", "first_name": "James", "gender": "Male", "birthday": "1998-11-20", "nationality": "United Kingdom", "province_residence": "London", "passport_type": "Ordinary passport", "passport_no": "".join(random.choices("ABCDEFGHIJKLMNOPQRSTUVWXYZ", k=2)) + "".join(random.choices("0123456789", k=7)), "phone_country_code": "44", "phone_number": "7400000000", "departure_origin_date": departure_origin_date, "arrival_schengen_area_date": arrival_schengen_area_date, "departure_schengen_area_date": departure_schengen_area_date, } try: # 每次请求不传 seed,randomuser 会返回不同用户 resp = requests.get("https://randomuser.me/api/?nat=gb", timeout=15) resp.raise_for_status() raw = resp.json() result = (raw.get("results") or [None])[0] if not result: return default_payload first_name = result.get("name", {}).get("first") or default_payload["first_name"] last_name = result.get("name", {}).get("last") or default_payload["last_name"] gender_raw = (result.get("gender") or "").strip().lower() gender = "Male" if gender_raw == "male" else "Female" birthday_raw = result.get("dob", {}).get("date", "") birthday = birthday_raw[:10] if birthday_raw else default_payload["birthday"] city = result.get("location", {}).get("city") or default_payload["location"] state = result.get("location", {}).get("state") or default_payload["province_residence"] country = result.get("location", {}).get("country") or default_payload["nationality"] phone_raw = result.get("cell") or result.get("phone") or default_payload["phone_number"] phone_number = re.sub(r"\D", "", phone_raw) or default_payload["phone_number"] email_prefix = re.sub(r"[^a-z0-9]", "", f"{first_name}{last_name}".lower()) if not email_prefix: email_prefix = f"user{random.randint(100000, 999999)}" email = f"{email_prefix}{random.randint(1000, 9999)}@gmail-app.com" return { "email": email, "pwd": "Visafly@111", "location": city, "visa_type": "Short stay (<90 days) - Tourism", "travel_purpose": "Tourism / Private visit", "application_form_id": "FRA" + "".join(str(random.randint(0, 9)) for _ in range(14)), "last_name": last_name, "first_name": first_name, "gender": gender, "birthday": birthday, "nationality": country, "province_residence": state, "passport_type": "Ordinary passport", "passport_no": "".join(random.choices("ABCDEFGHIJKLMNOPQRSTUVWXYZ", k=2)) + "".join(random.choices("0123456789", k=7)), "phone_country_code": "44", "phone_number": phone_number, "departure_origin_date": departure_origin_date, "arrival_schengen_area_date": arrival_schengen_area_date, "departure_schengen_area_date": departure_schengen_area_date, } except Exception: return default_payload class TlsRegistrator: def __init__(self, tls_url, proxy_config: Optional[Dict]=None, capsolver_key: Optional[str]=None, account_detail: Optional[Dict]=None): self.proxy_config = proxy_config self.capsolver_key = capsolver_key self.account_detail = account_detail # 隔离的用户数据目录 #self.instance_id = uuid.uuid4().hex[:8] self.tls_url = tls_url self.instance_id = '18d389e9' self.workspace = os.path.abspath(os.path.join("data", f"reg_session_{self.instance_id}")) self.page = None self.mouse = None self.keyboard = None def _log(self, msg): print(f"[TLS-Reg-{self.instance_id}] {msg}") def _get_free_port(self): """获取可用端口,防止 DrissionPage 解析日志报错""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) return s.getsockname()[1] def init_browser(self): """初始化独立、配置好代理的浏览器环境""" self._log("Initializing browser...") co = ChromiumOptions() # 1. 端口与路径隔离 port = self._get_free_port() co.set_local_port(port) co.set_user_data_path(self.workspace) # 2. 代理配置 (支持账号密码) if self.proxy_config and self.proxy_config.get("ip"): p = self.proxy_config if p.get("username") and p.get("password"): proxy_str = f"{p['username']}:{p['password']}@{p['ip']}:{p['port']}" else: proxy_str = f"{p.get('scheme', 'http')}://{p['ip']}:{p['port']}" self._log(f"Setting proxy: {p['ip']}:{p['port']}") co.set_proxy(proxy_str) # 3. 反爬及稳定性配置 co.headless(False) co.set_argument('--no-sandbox') co.set_argument('--disable-gpu') co.set_argument('--disable-dev-shm-usage') co.set_argument('--window-size=1920,1080') co.set_argument('--disable-blink-features=AutomationControlled') self.page = ChromiumPage(co) self.page.get(self.tls_url) cf_bypasser = CloudflareBypasser(self.page, log=True) cf_bypasser.bypass() time.sleep(3) cf_bypasser.handle_waiting_room() self._log("正在初始化拟人化工具...") self.mouse = HumanMouse(self.page, debug=True) self.keyboard = HumanKeyboard(self.page) self._log("随机化鼠标开始位置...") viewport_width = self.page.rect.viewport_size[0] viewport_height = self.page.rect.viewport_size[1] init_x = random.randint(10, viewport_width - 10) init_y = random.randint(10, viewport_height - 10) self.mouse.move(init_x, init_y) def solve_captcha(self, page_url: str, task_type: str, site_key: str, use_proxy = False, action: str = None, api_domain: str = None) -> str: """通用解决验证码 (同步 User-Agent 防止被盾识别为高风险)""" if not self.capsolver_key: raise ValueError("Capsolver API key missing") task = { "type": task_type, "websiteURL": page_url, "websiteKey": site_key, } if api_domain: task["apiDomain"] = api_domain if use_proxy: proxy = self.proxy_config task["proxyType"] = proxy.get('scheme', 'http') task["proxyAddress"] = proxy.get('ip') task["proxyPort"] = int(proxy.get('port')) if proxy.get('username'): task["proxyLogin"] = proxy.get('username') task["proxyPassword"] = proxy.get('password') if action: task["pageAction"] = action payload = {"clientKey": self.capsolver_key, "task": task} res = requests.post("https://api.capsolver.com/createTask", json=payload, timeout=20) if res.status_code != 200 or res.json().get("errorId") != 0: raise Exception(f"Failed to create capsolver task: {res.text}") task_id = res.json().get("taskId") self._log(f"Task created: {task_id}. Waiting for solution...") for _ in range(30): r = requests.post( "https://api.capsolver.com/getTaskResult", json={"clientKey": self.capsolver_key, "taskId": task_id}, timeout=20 ) data = r.json() if data.get("status") == "ready": self._log("Captcha solved successfully!") return data["solution"].get("gRecaptchaResponse") or data["solution"].get("token") time.sleep(3) raise Exception("Capsolver task timeout") def register(self): """执行自动注册""" email = self.account_detail.get('email') password = self.account_detail.get('pwd') btn_selector = '#submit' if not self.page.wait.ele_displayed(btn_selector, timeout=3): register_btn = self.page.ele("tag:a@@href:registration") self.mouse.human_click_ele(register_btn) time.sleep(3) if not self.page.wait.ele_displayed(btn_selector, timeout=10): raise BizLogicError(message=f"Can't find selector={btn_selector}") time.sleep(random.uniform(0.5, 1)) self._log("正在填写邮箱和密码...") email_input_e = self.page.ele('#email') self.mouse.human_click_ele(email_input_e) self.keyboard.type_text(email, humanize=True) time.sleep(random.uniform(0.2, 0.5)) password_e = self.page.ele('#password') self.mouse.human_click_ele(password_e) self.keyboard.type_text(password, humanize=True) time.sleep(random.uniform(0.2, 0.5)) confirm_password_e = self.page.ele('#confirm-password') self.mouse.human_click_ele(confirm_password_e) self.keyboard.type_text(password, humanize=True) time.sleep(random.uniform(0.2, 0.5)) self._log("正在勾选必选条款...") for checkbox_id in ['#terms-and-conditions', '#biometric-data', '#privacy-notice']: check_box_e = self.page.ele(checkbox_id).next() self.mouse.human_click_ele(check_box_e) time.sleep(random.uniform(0.3, 0.6)) self._log("提交注册...") btn_e = self.page.ele(btn_selector) btn_e.scroll.to_see(center=True) time.sleep(random.uniform(0.3, 0.6)) btn_mx, btn_my = int(btn_e.rect.midpoint[0]), int(btn_e.rect.midpoint[1]) self.mouse.move(btn_mx, btn_my, humanize=True) time.sleep(0.5) btn_e.click(by_js=True) self._log("正在等待验证结果 (最多10秒)...") success_dialog = self.page.wait.ele_displayed('tag:h1@text():Check your email inbox', timeout=10) if not success_dialog: self.page.get_screenshot("failed_submit.png") raise BizLogicError(message='Failed to submit account registration') self._log("✅ 操作成功!已弹出提示:Check your email inbox") return True def activate(self, sent_at=None): email = self.account_detail.get('email') email_box = 'visafly666@gmail.com' sender = 'TLSContact' recipient = email subject_keywords = 'TLSContact' body_keywords = '' if not sent_at: now_utc = datetime.utcnow() sent_at = now_utc.strftime("%Y-%m-%d %H:%M:%S") content_out = VSCloudApi.Instance().fetch_mail_content( email=email_box, sender=sender, recipient=recipient, subject_keywords=subject_keywords, body_keywords=body_keywords, sent_date=sent_at, expiry=600 ) self._log(f'activate email content={content_out}') match = re.search(r'https://\S+', content_out) activate_link = match.group(0) if match else None self.page.get(activate_link) btn_selector = "#activation-pending-button" if not self.page.wait.ele_displayed(btn_selector, timeout=10): raise BizLogicError(message=f"Wait ele={btn_selector} timeout") self.page.ele(btn_selector).click() time.sleep(3) def make_account_useful(self): def fill_date_field(page, selector, date_str): if not date_str: return ele = page.ele(selector) ele.scroll.to_see(center=True) ele.click() time.sleep(0.2) year, month, day = date_str.split('-') page.actions.type(year) page.actions.type(Keys.TAB) time.sleep(0.1) page.actions.type(month) time.sleep(0.1) page.actions.type(day) email = self.account_detail.get('email') password = self.account_detail.get('pwd') location = self.account_detail.get('location') btn_selector = 'tag:button@@text():Login' if not self.page.wait.ele_displayed(btn_selector, timeout=3): login_btn = self.page.ele("tag:a@@href:login") self.mouse.human_click_ele(login_btn) time.sleep(3) if not self.page.wait.ele_displayed(btn_selector, timeout=10): raise BizLogicError(message=f"Can't find selector={btn_selector}") # recpatchav2_token = "" # if self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'): # self._log("Solving ReCaptcha...") # recpatchav2_token = self.solve_captcha(self.page.url, "ReCaptchaV2TaskProxyLess", "6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0") input_ele = self.page.ele('tag:label@@text():Email').next() self.mouse.human_click_ele(input_ele) time.sleep(random.uniform(0.2, 0.6)) self.keyboard.type_text(email, humanize=True) time.sleep(random.uniform(0.5, 1.2)) input_ele = self.page.ele('tag:label@@text():Password').next() self.mouse.human_click_ele(input_ele) time.sleep(random.uniform(0.2, 0.6)) self.keyboard.type_text(password, humanize=True) # if recpatchav2_token: # inject_recpatchav2_token_js = f""" # var g = document.getElementById('g-recaptcha-response'); # if(g) {{ g.value = "{recpatchav2_token}"; }} # """ # self._log("Inject ReCaptchaV2 Token via JS...") # self.page.run_js(inject_recpatchav2_token_js) # time.sleep(random.uniform(0.5, 1.0)) self._log("Submitting Login...") time.sleep(random.uniform(0.3, 0.8)) login_btn = self.page.ele('tag:button@@text():Login') self.mouse.human_click_ele(login_btn) self._log("Waiting for dashboard redirect...") self.page.wait.url_change('login-actions', exclude=True, timeout=45) time.sleep(4) if "login-actions" in self.page.url or "auth" in self.page.url: raise BizLogicError(message="Login Failed! Invalid credentials or Captcha rejected.") self._log("Waiting for dashboard...") self.page.wait.load_start() time.sleep(5) # 解析 Dashboard 提取 Group ID # self._log("Parsing Dashboard for Travel Group...") # html = self.page.html # js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups' # js_match = re.search(js_pattern, html, re.DOTALL) # groups = [] # if js_match: # json_str = js_match.group(1).replace(r'\"', '"') # groups = json.loads(json_str) # travel_group = None # for g in groups: # if g.get('vacName', '').lower() == location.lower(): # travel_group = g # break # if not travel_group: # raise BizLogicError(message=f"Travel Group not found for {location}") # formgroup_id = travel_group.get('formGroupId') # self._log(f"Waiting for group button to render: {formgroup_id}") # btn_selector = f'tag:a@@data-testid=btn-select-group' # self._log(f"Select group_id={formgroup_id}...") # self.mouse.human_click_ele(self.page.ele(btn_selector)) # self._log("Waiting for url redirect...") # self.page.wait.url_change('travel-groups', exclude=True, timeout=45) # time.sleep(2) # if "travel-groups" in self.page.url or "auth" in self.page.url: # raise BizLogicError(message="Redirect to service-level Failed!") btn_selector = 'tag:button@@data-testid=btn-add-applicant' if not self.page.wait.ele_displayed(btn_selector, timeout=10): btn_selector = 'tag:button@@data-testid=btn-max-number-of-applicants' if not self.page.ele(btn_selector): raise BizLogicError(message=f"Can't find selector={btn_selector}") add_btn = self.page.ele(btn_selector) add_btn.scroll.to_see(center=True) time.sleep(random.uniform(0.6, 0.8)) add_btn.click() time.sleep(6) visa_type = self.account_detail.get("visa_type") if visa_type: btn = self.page.ele('tag:button@@data-testid=input-visa-type') btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:span@@text():{visa_type}').click(by_js=True) time.sleep(0.5) tavel_purpose = self.account_detail.get("travel_purpose") if tavel_purpose: btn = self.page.ele('tag:button@@data-testid=input-travel-purpose') btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:span@@text():{tavel_purpose}').click(by_js=True) time.sleep(0.5) application_form_id = self.account_detail.get('application_form_id') if application_form_id: ele = self.page.ele('tag:input@@data-testid=f_cai') ele.scroll.to_see(center=True) ele.input(application_form_id) last_name = self.account_detail.get('last_name') if last_name: ele = self.page.ele('tag:input@@data-testid=f_pers_surnames') ele.scroll.to_see(center=True) ele.input(last_name.upper()) first_name = self.account_detail.get('first_name') if first_name: ele = self.page.ele('tag:input@@data-testid=f_pers_givennames') ele.scroll.to_see(center=True) ele.input(first_name.upper()) gender = self.account_detail.get('gender') if gender: gender = gender.capitalize() ele = self.page.ele(f'tag:label@@text():{gender}') ele.scroll.to_see(center=True) ele.click() fill_date_field(self.page, 'tag:input@@data-testid=f_pers_birth_date', self.account_detail.get('birthday')) nationality = self.account_detail.get('nationality') if nationality: nationality = nationality.title() btn = self.page.ele('tag:label@@for=f_pers_nationality').next() btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:li@@role=option@@text():{nationality}').click(by_js=True) time.sleep(0.5) province_residence = self.account_detail.get('province_residence') if province_residence: try: province_residence = province_residence.title() btn = self.page.ele('tag:label@@for=f_pers_province').next() btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:li@@role=option@@text():{province_residence}').click(by_js=True) time.sleep(0.5) except Exception as e: self._log(e) passport_type = self.account_detail.get('passport_type') if passport_type: btn = self.page.ele('tag:label@@for=f_identity_type').next() btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:li@@role=option@@text():{passport_type}').click(by_js=True) time.sleep(0.5) passport_no = self.account_detail.get('passport_no') if passport_no: ele = self.page.ele('tag:input@@data-testid=f_pass_num') ele.scroll.to_see(center=True) ele.input(passport_no.upper()) phone_country_code = self.account_detail.get('phone_country_code') phone_number = self.account_detail.get('phone_number') if phone_country_code: div = self.page.ele('tag:label@@for=f_pers_mobile_phone').next() btn = div.ele('tag:button') btn.scroll.to_see(center=True) btn.click() time.sleep(0.5) self.page.ele(f'tag:li@@role=option@@text():+{phone_country_code}').click(by_js=True) time.sleep(0.5) div.ele('tag:input@@type:tel').input(phone_number) # 使用封装好的日期输入函数 fill_date_field(self.page, 'tag:input@@data-testid=fi_trav_origin_departure_date', self.account_detail.get('departure_origin_date')) fill_date_field(self.page, 'tag:input@@data-testid=f_trav_departure_date', self.account_detail.get('arrival_schengen_area_date')) fill_date_field(self.page, 'tag:input@@data-testid=f_trav_arrival_date', self.account_detail.get('departure_schengen_area_date')) submit_btn = self.page.ele('tag:button@@data-testid=btn-submit') submit_btn.scroll.to_see(center=True) time.sleep(1) submit_btn.click() time.sleep(6) submit_btn = self.page.ele('tag:button@@text():Confirm') submit_btn.scroll.to_see(center=True) submit_btn.click() def cleanup(self): """清理浏览器进程和缓存文件夹""" self._log("Cleaning up resources...") if self.page: try: self.page.quit() except: pass if os.path.exists(self.workspace): time.sleep(1) # 等待文件锁释放 shutil.rmtree(self.workspace, ignore_errors=True) if __name__ == "__main__": # ================= 配置区域 ================= PROXY_CONFIG = { "scheme": "http", "ip": "127.0.0.1", "port": "7890", "username": "", "password": "" } CAPSOLVER_KEY = "CAP-5441DD341DD3CC2FAEF0BE6FE493EE9A" TLS_URL = "https://visas-fr.tlscontact.com/en-us/country/cn/vac/cnCNG2fr" ACCOUNT_DETAIL = generate_random_account_detail() # ============================================ try: bot = TlsRegistrator(TLS_URL, proxy_config=PROXY_CONFIG, capsolver_key=CAPSOLVER_KEY, account_detail=ACCOUNT_DETAIL) bot.init_browser() now_utc = datetime.utcnow() sent_at = now_utc.strftime("%Y-%m-%d %H:%M:%S") bot.register() bot.activate(sent_at=sent_at) bot.make_account_useful() except Exception as e: print(f'Exception Info={e}') time.sleep(3600)