import os import random import socket import json import time import string import logging import base64 import requests from datetime import datetime, timezone from urllib.parse import urlparse, urlencode from bs4 import BeautifulSoup from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.backends import default_backend from DrissionPage import ChromiumPage, ChromiumOptions from vs_types import RateLimiteddError, BizLogicError from utils.cloudflare_bypass_for_scraping import CloudflareBypasser # --- 配置日志 --- logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%H:%M:%S' ) logger = logging.getLogger("VFSRegistrar") # --- 常量 --- VFS_PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuupFgB+lYIOtSxrRoHzc LmCZKJ6+oSbgqgOPzFMM0TasOeLw0NXEn1XfIzXdx75+tegNKwyIZumoh0yhubKs t59GV321kN0iquYRHrdh3ygfDDHlS9rROQeBqRga0ncSADtbLMrBPqXJjPCoV76y t92towriKoH75BhiazY0mghm4LjmAWrV0u/GNpV3tk9bxbtHEXGaFmxCJqjg+7x6 1e5wXLfvpj9w1QsiSWOSJxLOyICz/9ByxXycQQFdNmjnnnwco9Gt/Mi33NYH71j0 5oXIjklFC4lvJqaqSY5lS7Vwb9oCt9zX9J0Yz4z4e/3V+0jgRnWOFGofyks4FKe2 GQIDAQAB -----END PUBLIC KEY-----""" def upload_account_to_server(account): """ 将注册成功的账号上报到中心服务器 """ api_url = 'https://visafly.top/api/account/add' api_token = 'tok_e946329a60ff45ba807f3f41b0e8b7fc' # 你的 Bearer Token # 构造请求头 headers = { 'accept': 'application/json', 'Authorization': f'Bearer {api_token}', 'Content-Type': 'application/json' } # 构造 extra_data (存放 VFS 特有的国家、领区、手机号信息) extra_payload = { "country_code": account.get("country_code"), "mission_code": account.get("mission_code"), "phone_country_code": account.get("phone_country_code"), "phone_number": account.get("phone_number"), "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } # 构造主 Payload payload = { "pool_name": account.get("pool_name", "default_pool"), "username": account.get("username"), "password": account.get("password"), "extra_data": extra_payload } try: logger.info(f"Uploading account {account['username']} to server...") resp = requests.post(api_url, json=payload, headers=headers, timeout=10) if resp.status_code == 200: logger.info(f"✅ [API Upload Success] Server responded: {resp.text}") return True else: logger.error(f"❌ [API Upload Failed] Status: {resp.status_code}, Body: {resp.text}") return False except Exception as e: logger.error(f"❌ [API Upload Error]: {e}") return False class VFSHelper: """工具方法的静态类""" @staticmethod def generate_mobile_number(country_code=353, e164_format=False): if country_code == 353: # Ireland prefix = random.choice(['83', '85', '86', '87', '89']) number = f"{prefix}{''.join([str(random.randint(0, 9)) for _ in range(7)])}" return f"+353{number}" if e164_format else number elif country_code == 44: # UK prefix_second = random.choice(['1', '2', '3', '4', '5', '7', '8', '9']) number = f"7{prefix_second}{''.join([str(random.randint(0, 9)) for _ in range(8)])}" return f"+44{number}" if e164_format else number elif country_code == 86: # China prefixes = ["130", "131", "132", "133", "135", "136", "138", "139", "150", "158", "159", "186"] prefix = random.choice(prefixes) number = f"{prefix}{''.join([str(random.randint(0, 9)) for _ in range(8)])}" return f"+86{number}" if e164_format else number return "".join([str(random.randint(0, 9)) for _ in range(10)]) @staticmethod def generate_password(length=12): chars = string.ascii_letters + string.digits + "@#$%" while True: pwd = ''.join(random.choices(chars, k=length)) if (any(c.islower() for c in pwd) and any(c.isupper() for c in pwd) and any(c.isdigit() for c in pwd) and any(c in "@#$%" for c in pwd)): return pwd @staticmethod def encrypt_password(password: str) -> str: public_key = serialization.load_pem_public_key( VFS_PUBLIC_KEY_PEM.encode(), backend=default_backend() ) ciphertext = public_key.encrypt( password.encode(), padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) return base64.b64encode(ciphertext).decode() @staticmethod def get_client_source() -> str: timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") payload = f"GA;{timestamp}Z" return VFSHelper.encrypt_password(payload) class BrowserResponse: """标准化浏览器响应""" def __init__(self, result_dict): result_dict = result_dict or {} self.status_code = result_dict.get('status', 0) self.text = result_dict.get('body', '') self.headers = result_dict.get('headers', {}) self.url = result_dict.get('url', '') self._json = None def json(self): if self._json is None: try: self._json = json.loads(self.text) if self.text else {} except json.JSONDecodeError: self._json = {} return self._json class VFSRegistrationBot: def __init__(self, config): self.config = config self.page = None self.proxy_url = config.get("proxy_url") def _init_browser(self): """初始化浏览器配置""" co = ChromiumOptions() # 查找可用端口 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) port = s.getsockname()[1] co.set_local_port(port) chrome_path = os.getenv("CHROME_BIN") if chrome_path: co.set_paths(browser_path=chrome_path) if self.proxy_url: co.set_argument(f'--proxy-server={self.proxy_url}') co.headless(False) # VFS 验证码通常需要有头模式 co.set_argument('--no-sandbox') co.set_argument('--disable-gpu') co.set_argument('--disable-dev-shm-usage') co.set_argument('--window-size=1920,1080') co.set_argument('--disable-blink-features=AutomationControlled') # 创建页面对象 self.page = ChromiumPage(co) # 设置超时 self.page.set.timeouts(15) def _perform_js_fetch(self, method, url, headers=None, data=None, json_data=None, retry_count=0): """注入JS执行Fetch请求,绕过部分指纹检测""" if not self.page: raise BizLogicError("Browser not initialized") if retry_count > 3: raise BizLogicError("Max retries exceeded for request") fetch_options = { "method": method.upper(), "headers": headers or {}, "credentials": "include" } if json_data: fetch_options['body'] = json.dumps(json_data) fetch_options['headers']['Content-Type'] = 'application/json' elif data: fetch_options['body'] = urlencode(data) if isinstance(data, dict) else str(data) fetch_options['headers']['Content-Type'] = 'application/x-www-form-urlencoded' logger.debug(f"Request: {method} {url}") js_script = f""" const url = "{url}"; const options = {json.dumps(fetch_options)}; return fetch(url, options) .then(async response => {{ const text = await response.text(); const headers = {{}}; response.headers.forEach((value, key) => headers[key] = value); return {{ status: response.status, body: text, headers: headers, url: response.url }}; }}) .catch(err => ({{ status: 0, body: err.toString() }})); """ try: res_dict = self.page.run_js(js_script, timeout=60) resp = BrowserResponse(res_dict) if resp.status_code == 200: return resp # 处理 Cloudflare 403 拦截 if resp.status_code == 403 and ("cloudflare" in resp.text.lower() or "Just a moment" in resp.text): logger.warning(f"Cloudflare 403 detected. Retrying ({retry_count+1})...") new_token = self._refresh_turnstile() if new_token and json_data and "captcha_api_key" in json_data: json_data["captcha_api_key"] = new_token return self._perform_js_fetch(method, url, headers, data, json_data, retry_count + 1) if resp.status_code == 429: raise RateLimiteddError(f"Rate Limit: {resp.text[:100]}") return resp # 返回其他状态码供调用者处理 (如 400 业务错误) except Exception as e: logger.error(f"JS Execution Error: {e}") raise BizLogicError(f"Fetch failed: {e}") def _handle_cookie_banner(self): """处理 Cookie 弹窗""" try: js = """ var btn = document.getElementById('onetrust-accept-btn-handler'); if(btn) { btn.click(); return true; } var banner = document.getElementById('onetrust-banner-sdk'); if(banner) { banner.remove(); return true; } """ self.page.run_js(js) except: pass def _refresh_turnstile(self): """刷新并获取 Cloudflare Token""" logger.info("Attempting to refresh Turnstile token...") try: self.page.run_js('try{window.turnstile.reset()}catch(e){}') cf_bypasser = CloudflareBypasser(self.page, log=True) for i in range(30): time.sleep(1) try: ele = self.page.ele('@name=cf-turnstile-response') if ele and ele.value: logger.info("Turnstile token obtained.") return ele.value except: pass if i > 5: try: cf_bypasser.click_verification_button(is_dfs=False) except: pass except Exception as e: logger.error(f"Turnstile refresh failed: {e}") return None def _wait_for_activation_link(self, username, max_wait_sec=60): """轮询获取激活链接""" logger.info(f"Waiting for email to {username}...") start_time = time.time() master_email = self.config.get("master_email", "visafly666@gmail.com") # 配置代理 proxies = { "http": self.proxy_url, "https": self.proxy_url, } if self.proxy_url else None while time.time() - start_time < max_wait_sec: try: utc_now_str = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') params = { "email": master_email, "sender": 'donotreply at vfsglobal.com', "recipient": username, "subjectKeywords": 'Welcome', "bodyKeywords": 'ActivateAccount', "sentDate": utc_now_str, "expiry": str(60) } url = f"https://visafly.top/api/email-authorizations/fetch" headers = { "Authorization": "Bearer tok_e946329a60ff45ba807f3f41b0e8b7fc", "Content-Type": "application/json", "Accept": "application/json, text/plain, */*" } # 发送请求,添加代理和超时 # 注意:如果 body 为空,建议传 json={} 而不是 data="" resp = requests.post(url, headers=headers, params=params, json={}, proxies=proxies, timeout=60) # --- 关键改进:先检查状态码 --- if resp.status_code != 200: logger.warning(f"Email API returned status {resp.status_code}. Body: {resp.text[:100]}") time.sleep(15) continue # --- 关键改进:安全解析 JSON --- try: result = resp.json() except json.JSONDecodeError: logger.error(f"Failed to decode JSON. Response was: {resp.text[:200]}") time.sleep(15) continue if result.get('code') != 0: # 这里的错误通常是业务逻辑错误(如:邮件还没到) logger.debug(f"API Message: {result.get('message')}") else: data = result.get('data', {}) content = data.get('body', "") if content: soup = BeautifulSoup(content, "html.parser") link = soup.find("a", string="ActivateAccount") if link: raw_link = link["href"] clean_url = raw_link.replace(" ", "").replace("\n", "").replace("\r", "").strip() return clean_url except Exception as e: logger.warning(f"Error fetching email: {e}") time.sleep(15) logger.info("Checking email again...") return None def register(self, account): """执行单个账号注册""" website = self.config['website'] try: self._init_browser() logger.info(f"Opening {website}") # 1. 设置超时 self.page.set.timeouts(page_load=30, script=30) # 2. 尝试打开页面 try: self.page.get(website, retry=0, timeout=30) except Exception: # 超时强制停止,防止卡死 logger.warning(f"Page load timed out (Stopped manually). Checking URL...") self.page.stop_loading() # 1. 过盾 cf_token = None cf_bypasser = CloudflareBypasser(self.page, log=True) for _ in range(40): time.sleep(1) current_url = self.page.url if "page-not-found" in current_url: logger.error(f"❌ [BLOCKED] Redirected to 'Page Not Found' during check. Aborting.") return False # 如果页面标题变成 403 Forbidden if "403" in self.page.title and "Just a moment" not in self.page.title: logger.error(f"❌ [BLOCKED] 403 Forbidden detected. Aborting.") return False self._handle_cookie_banner() # 尝试获取 Token try: ele = self.page.ele('@name=cf-turnstile-response') if ele and ele.value: cf_token = ele.value if cf_token: break except: pass # 尝试点击 try: cf_bypasser.click_verification_button(is_dfs=False) except: pass if not cf_token: raise BizLogicError("Failed to obtain initial Cloudflare token") # 2. 构造注册请求 post_data = { 'emailid': account['username'], 'password': VFSHelper.encrypt_password(account['password']), 'confirmPassword': VFSHelper.encrypt_password(account['password']), 'processPerDataAgreed': True, 'intTransPerDataAgreed': True, 'termAndConditionAgreed': True, 'missioncode': account['mission_code'], 'countrycode': account['country_code'], 'languageCode': 'en', 'dialcode': str(account['phone_country_code']), 'contact': account['phone_number'], 'captcha_version': 'cloudflare-v1', 'captcha_api_key': cf_token, 'cultureCode': 'en-US', 'IsSpecialUser': False, } headers = { 'content-type': 'application/json;charset=utf-8', 'accept': 'application/json, text/plain, */*', 'route': f"{account['country_code']}/en/{account['mission_code']}", 'clientsource': VFSHelper.get_client_source(), } logger.info(f"Submitting registration for {account['username']}") resp = self._perform_js_fetch( 'POST', 'https://lift-api.vfsglobal.com/user/registration', headers=headers, json_data=post_data ) logger.info(f"Registration response: {resp.text}") resp_data = resp.json() if resp_data.get("code") == "200": logger.info("Registration API success. Waiting for email...") activate_link = self._wait_for_activation_link(account['username']) if activate_link: logger.info(f"Activating account: {activate_link}") # 在当前浏览器上下文中打开链接,保持环境一致性 # === 关键步骤:打开新标签页并验证结果 === # 打开新标签页 activate_tab = self.page.new_tab(activate_link) try: # 等待页面加载并查找 "Activation Successful" 文本 # timeout=30 表示最多等待 30 秒 logger.info("Waiting for 'Activation Successful' message on page...") # DrissionPage 查找包含特定文本的元素 success_ele = activate_tab.ele('Activation Successful', timeout=30) if success_ele: logger.info(f"✅ Account {account['username']} activated successfully (Verified).") return True else: # 如果没找到成功提示,尝试读取页面内容找错误原因 body_text = activate_tab.ele('tag:body').text[:200] logger.error(f"Activation verification failed. Page text: {body_text}") return False except Exception as e: logger.error(f"Error checking activation status: {e}") return False finally: # 无论成功失败,关闭激活标签页,切回主标签 activate_tab.close() else: logger.error("Timeout waiting for activation email.") else: logger.error(f"Registration failed: {resp_data}") except Exception as e: logger.error(f"Registration process exception: {e}", exc_info=True) finally: if self.page: try: self.page.quit() except: pass return False # --- 主流程 --- def generate_account_details(config): """生成账号数据字典""" account_prefix = config.get('account_prefix', 'vfs') pool_name = config.get('pool_name', 'vfs') rand_suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6)) username = f"{account_prefix}_{rand_suffix}@{config['email_domain']}.com" phone = VFSHelper.generate_mobile_number(config['phone_country_code']) return { 'pool_name': pool_name, 'country_code': config['country_code'], 'mission_code': config['mission_code'], 'phone_country_code': config['phone_country_code'], 'phone_number': phone, 'username': username, 'password': VFSHelper.generate_password(), } def main(): # 配置 config = { "pool_name": "ie.hu.booker", "account_prefix": "ie_hu", "email_domain": "gmail-app", "master_email": "visafly666@gmail.com", "proxy_url": "http://127.0.0.1:7890", "target_count": 10, "phone_country_code": 353, "country_code": "irl", "mission_code": "hun", "website": "https://visa.vfsglobal.com/irl/en/hun/register", } bot = VFSRegistrationBot(config) success_accounts = [] print(">>> Starting Registration Bot <<<") while len(success_accounts) < config['target_count']: account = generate_account_details(config) logger.info(f"Processing Account: {account['username']}") is_success = bot.register(account) if is_success: success_accounts.append(account) logger.info(f"Progress: {len(success_accounts)}/{config['target_count']}") upload_account_to_server(account) # 保存结果到文件,防止中途退出丢失 with open("registered_accounts.json", "w", encoding='utf-8') as f: json.dump(success_accounts, f, indent=4, ensure_ascii=False) else: logger.warning("Retrying with new account details...") # 稍微暂停,避免请求过于频繁 time.sleep(5) print(">>> All tasks completed <<<") if __name__ == "__main__": main()