| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 |
- import os
- import random
- import socket
- import json
- import time
- import string
- import logging
- import base64
- import requests
- from datetime import datetime, timezone
- from urllib.parse import urlparse, urlencode
- from bs4 import BeautifulSoup
- from cryptography.hazmat.primitives import serialization, hashes
- from cryptography.hazmat.primitives.asymmetric import padding
- from cryptography.hazmat.backends import default_backend
- from DrissionPage import ChromiumPage, ChromiumOptions
- from vs_types import RateLimiteddError, BizLogicError
- from utils.cloudflare_bypass_for_scraping import CloudflareBypasser
- # --- 配置日志 ---
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s [%(levelname)s] %(message)s',
- datefmt='%H:%M:%S'
- )
- logger = logging.getLogger("VFSRegistrar")
- # --- 常量 ---
- VFS_PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY-----
- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuupFgB+lYIOtSxrRoHzc
- LmCZKJ6+oSbgqgOPzFMM0TasOeLw0NXEn1XfIzXdx75+tegNKwyIZumoh0yhubKs
- t59GV321kN0iquYRHrdh3ygfDDHlS9rROQeBqRga0ncSADtbLMrBPqXJjPCoV76y
- t92towriKoH75BhiazY0mghm4LjmAWrV0u/GNpV3tk9bxbtHEXGaFmxCJqjg+7x6
- 1e5wXLfvpj9w1QsiSWOSJxLOyICz/9ByxXycQQFdNmjnnnwco9Gt/Mi33NYH71j0
- 5oXIjklFC4lvJqaqSY5lS7Vwb9oCt9zX9J0Yz4z4e/3V+0jgRnWOFGofyks4FKe2
- GQIDAQAB
- -----END PUBLIC KEY-----"""
- def upload_account_to_server(account):
- """
- 将注册成功的账号上报到中心服务器
- """
- api_url = 'https://visafly.top/api/account/add'
- api_token = 'tok_e946329a60ff45ba807f3f41b0e8b7fc' # 你的 Bearer Token
- # 构造请求头
- headers = {
- 'accept': 'application/json',
- 'Authorization': f'Bearer {api_token}',
- 'Content-Type': 'application/json'
- }
- # 构造 extra_data (存放 VFS 特有的国家、领区、手机号信息)
- extra_payload = {
- "country_code": account.get("country_code"),
- "mission_code": account.get("mission_code"),
- "phone_country_code": account.get("phone_country_code"),
- "phone_number": account.get("phone_number"),
- "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- }
- # 构造主 Payload
- payload = {
- "pool_name": account.get("pool_name", "default_pool"),
- "username": account.get("username"),
- "password": account.get("password"),
- "extra_data": extra_payload
- }
- try:
- logger.info(f"Uploading account {account['username']} to server...")
- resp = requests.post(api_url, json=payload, headers=headers, timeout=10)
-
- if resp.status_code == 200:
- logger.info(f"✅ [API Upload Success] Server responded: {resp.text}")
- return True
- else:
- logger.error(f"❌ [API Upload Failed] Status: {resp.status_code}, Body: {resp.text}")
- return False
- except Exception as e:
- logger.error(f"❌ [API Upload Error]: {e}")
- return False
- class VFSHelper:
- """工具方法的静态类"""
-
- @staticmethod
- def generate_mobile_number(country_code=353, e164_format=False):
- if country_code == 353: # Ireland
- prefix = random.choice(['83', '85', '86', '87', '89'])
- number = f"{prefix}{''.join([str(random.randint(0, 9)) for _ in range(7)])}"
- return f"+353{number}" if e164_format else number
-
- elif country_code == 44: # UK
- prefix_second = random.choice(['1', '2', '3', '4', '5', '7', '8', '9'])
- number = f"7{prefix_second}{''.join([str(random.randint(0, 9)) for _ in range(8)])}"
- return f"+44{number}" if e164_format else number
- elif country_code == 86: # China
- prefixes = ["130", "131", "132", "133", "135", "136", "138", "139", "150", "158", "159", "186"]
- prefix = random.choice(prefixes)
- number = f"{prefix}{''.join([str(random.randint(0, 9)) for _ in range(8)])}"
- return f"+86{number}" if e164_format else number
-
- return "".join([str(random.randint(0, 9)) for _ in range(10)])
- @staticmethod
- def generate_password(length=12):
- chars = string.ascii_letters + string.digits + "@#$%"
- while True:
- pwd = ''.join(random.choices(chars, k=length))
- if (any(c.islower() for c in pwd) and
- any(c.isupper() for c in pwd) and
- any(c.isdigit() for c in pwd) and
- any(c in "@#$%" for c in pwd)):
- return pwd
- @staticmethod
- def encrypt_password(password: str) -> str:
- public_key = serialization.load_pem_public_key(
- VFS_PUBLIC_KEY_PEM.encode(), backend=default_backend()
- )
- ciphertext = public_key.encrypt(
- password.encode(),
- padding.OAEP(
- mgf=padding.MGF1(algorithm=hashes.SHA256()),
- algorithm=hashes.SHA256(),
- label=None
- )
- )
- return base64.b64encode(ciphertext).decode()
- @staticmethod
- def get_client_source() -> str:
- timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
- payload = f"GA;{timestamp}Z"
- return VFSHelper.encrypt_password(payload)
- class BrowserResponse:
- """标准化浏览器响应"""
- def __init__(self, result_dict):
- result_dict = result_dict or {}
- self.status_code = result_dict.get('status', 0)
- self.text = result_dict.get('body', '')
- self.headers = result_dict.get('headers', {})
- self.url = result_dict.get('url', '')
- self._json = None
- def json(self):
- if self._json is None:
- try:
- self._json = json.loads(self.text) if self.text else {}
- except json.JSONDecodeError:
- self._json = {}
- return self._json
- class VFSRegistrationBot:
- def __init__(self, config):
- self.config = config
- self.page = None
- self.proxy_url = config.get("proxy_url")
-
- def _init_browser(self):
- """初始化浏览器配置"""
- co = ChromiumOptions()
-
- # 查找可用端口
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('', 0))
- port = s.getsockname()[1]
-
- co.set_local_port(port)
-
- chrome_path = os.getenv("CHROME_BIN")
- if chrome_path:
- co.set_paths(browser_path=chrome_path)
- if self.proxy_url:
- co.set_argument(f'--proxy-server={self.proxy_url}')
- co.headless(False) # VFS 验证码通常需要有头模式
- co.set_argument('--no-sandbox')
- co.set_argument('--disable-gpu')
- co.set_argument('--disable-dev-shm-usage')
- co.set_argument('--window-size=1920,1080')
- co.set_argument('--disable-blink-features=AutomationControlled')
-
- # 创建页面对象
- self.page = ChromiumPage(co)
- # 设置超时
- self.page.set.timeouts(15)
- def _perform_js_fetch(self, method, url, headers=None, data=None, json_data=None, retry_count=0):
- """注入JS执行Fetch请求,绕过部分指纹检测"""
- if not self.page:
- raise BizLogicError("Browser not initialized")
- if retry_count > 3:
- raise BizLogicError("Max retries exceeded for request")
- fetch_options = {
- "method": method.upper(),
- "headers": headers or {},
- "credentials": "include"
- }
-
- if json_data:
- fetch_options['body'] = json.dumps(json_data)
- fetch_options['headers']['Content-Type'] = 'application/json'
- elif data:
- fetch_options['body'] = urlencode(data) if isinstance(data, dict) else str(data)
- fetch_options['headers']['Content-Type'] = 'application/x-www-form-urlencoded'
- logger.debug(f"Request: {method} {url}")
- js_script = f"""
- const url = "{url}";
- const options = {json.dumps(fetch_options)};
-
- return fetch(url, options)
- .then(async response => {{
- const text = await response.text();
- const headers = {{}};
- response.headers.forEach((value, key) => headers[key] = value);
- return {{
- status: response.status,
- body: text,
- headers: headers,
- url: response.url
- }};
- }})
- .catch(err => ({{ status: 0, body: err.toString() }}));
- """
-
- try:
- res_dict = self.page.run_js(js_script, timeout=60)
- resp = BrowserResponse(res_dict)
-
- if resp.status_code == 200:
- return resp
-
- # 处理 Cloudflare 403 拦截
- if resp.status_code == 403 and ("cloudflare" in resp.text.lower() or "Just a moment" in resp.text):
- logger.warning(f"Cloudflare 403 detected. Retrying ({retry_count+1})...")
- new_token = self._refresh_turnstile()
- if new_token and json_data and "captcha_api_key" in json_data:
- json_data["captcha_api_key"] = new_token
- return self._perform_js_fetch(method, url, headers, data, json_data, retry_count + 1)
-
- if resp.status_code == 429:
- raise RateLimiteddError(f"Rate Limit: {resp.text[:100]}")
-
- return resp # 返回其他状态码供调用者处理 (如 400 业务错误)
-
- except Exception as e:
- logger.error(f"JS Execution Error: {e}")
- raise BizLogicError(f"Fetch failed: {e}")
- def _handle_cookie_banner(self):
- """处理 Cookie 弹窗"""
- try:
- js = """
- var btn = document.getElementById('onetrust-accept-btn-handler');
- if(btn) { btn.click(); return true; }
- var banner = document.getElementById('onetrust-banner-sdk');
- if(banner) { banner.remove(); return true; }
- """
- self.page.run_js(js)
- except:
- pass
- def _refresh_turnstile(self):
- """刷新并获取 Cloudflare Token"""
- logger.info("Attempting to refresh Turnstile token...")
- try:
- self.page.run_js('try{window.turnstile.reset()}catch(e){}')
- cf_bypasser = CloudflareBypasser(self.page, log=True)
-
- for i in range(30):
- time.sleep(1)
- try:
- ele = self.page.ele('@name=cf-turnstile-response')
- if ele and ele.value:
- logger.info("Turnstile token obtained.")
- return ele.value
- except:
- pass
-
- if i > 5:
- try:
- cf_bypasser.click_verification_button(is_dfs=False)
- except:
- pass
- except Exception as e:
- logger.error(f"Turnstile refresh failed: {e}")
- return None
- def _wait_for_activation_link(self, username, max_wait_sec=60):
- """轮询获取激活链接"""
- logger.info(f"Waiting for email to {username}...")
- start_time = time.time()
-
- master_email = self.config.get("master_email", "visafly666@gmail.com")
-
- # 配置代理
- proxies = {
- "http": self.proxy_url,
- "https": self.proxy_url,
- } if self.proxy_url else None
- while time.time() - start_time < max_wait_sec:
- try:
- utc_now_str = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
-
- params = {
- "email": master_email,
- "sender": 'donotreply at vfsglobal.com',
- "recipient": username,
- "subjectKeywords": 'Welcome',
- "bodyKeywords": 'ActivateAccount',
- "sentDate": utc_now_str,
- "expiry": str(60)
- }
-
- url = f"https://visafly.top/api/email-authorizations/fetch"
- headers = {
- "Authorization": "Bearer tok_e946329a60ff45ba807f3f41b0e8b7fc",
- "Content-Type": "application/json",
- "Accept": "application/json, text/plain, */*"
- }
- # 发送请求,添加代理和超时
- # 注意:如果 body 为空,建议传 json={} 而不是 data=""
- resp = requests.post(url, headers=headers, params=params, json={}, proxies=proxies, timeout=60)
-
- # --- 关键改进:先检查状态码 ---
- if resp.status_code != 200:
- logger.warning(f"Email API returned status {resp.status_code}. Body: {resp.text[:100]}")
- time.sleep(15)
- continue
- # --- 关键改进:安全解析 JSON ---
- try:
- result = resp.json()
- except json.JSONDecodeError:
- logger.error(f"Failed to decode JSON. Response was: {resp.text[:200]}")
- time.sleep(15)
- continue
- if result.get('code') != 0:
- # 这里的错误通常是业务逻辑错误(如:邮件还没到)
- logger.debug(f"API Message: {result.get('message')}")
- else:
- data = result.get('data', {})
- content = data.get('body', "")
-
- if content:
- soup = BeautifulSoup(content, "html.parser")
- link = soup.find("a", string="ActivateAccount")
- if link:
- raw_link = link["href"]
- clean_url = raw_link.replace(" ", "").replace("\n", "").replace("\r", "").strip()
- return clean_url
-
- except Exception as e:
- logger.warning(f"Error fetching email: {e}")
-
- time.sleep(15)
- logger.info("Checking email again...")
-
- return None
- def register(self, account):
- """执行单个账号注册"""
- website = self.config['website']
-
- try:
- self._init_browser()
- logger.info(f"Opening {website}")
-
- # 1. 设置超时
- self.page.set.timeouts(page_load=30, script=30)
-
- # 2. 尝试打开页面
- try:
- self.page.get(website, retry=0, timeout=30)
- except Exception:
- # 超时强制停止,防止卡死
- logger.warning(f"Page load timed out (Stopped manually). Checking URL...")
- self.page.stop_loading()
-
- # 1. 过盾
- cf_token = None
- cf_bypasser = CloudflareBypasser(self.page, log=True)
-
- for _ in range(40):
- time.sleep(1)
-
- current_url = self.page.url
- if "page-not-found" in current_url:
- logger.error(f"❌ [BLOCKED] Redirected to 'Page Not Found' during check. Aborting.")
- return False
-
- # 如果页面标题变成 403 Forbidden
- if "403" in self.page.title and "Just a moment" not in self.page.title:
- logger.error(f"❌ [BLOCKED] 403 Forbidden detected. Aborting.")
- return False
-
- self._handle_cookie_banner()
-
- # 尝试获取 Token
- try:
- ele = self.page.ele('@name=cf-turnstile-response')
- if ele and ele.value:
- cf_token = ele.value
- if cf_token:
- break
- except:
- pass
-
- # 尝试点击
- try:
- cf_bypasser.click_verification_button(is_dfs=False)
- except:
- pass
-
- if not cf_token:
- raise BizLogicError("Failed to obtain initial Cloudflare token")
- # 2. 构造注册请求
- post_data = {
- 'emailid': account['username'],
- 'password': VFSHelper.encrypt_password(account['password']),
- 'confirmPassword': VFSHelper.encrypt_password(account['password']),
- 'processPerDataAgreed': True,
- 'intTransPerDataAgreed': True,
- 'termAndConditionAgreed': True,
- 'missioncode': account['mission_code'],
- 'countrycode': account['country_code'],
- 'languageCode': 'en',
- 'dialcode': str(account['phone_country_code']),
- 'contact': account['phone_number'],
- 'captcha_version': 'cloudflare-v1',
- 'captcha_api_key': cf_token,
- 'cultureCode': 'en-US',
- 'IsSpecialUser': False,
- }
-
- headers = {
- 'content-type': 'application/json;charset=utf-8',
- 'accept': 'application/json, text/plain, */*',
- 'route': f"{account['country_code']}/en/{account['mission_code']}",
- 'clientsource': VFSHelper.get_client_source(),
- }
- logger.info(f"Submitting registration for {account['username']}")
- resp = self._perform_js_fetch(
- 'POST',
- 'https://lift-api.vfsglobal.com/user/registration',
- headers=headers,
- json_data=post_data
- )
- logger.info(f"Registration response: {resp.text}")
- resp_data = resp.json()
- if resp_data.get("code") == "200":
- logger.info("Registration API success. Waiting for email...")
-
- activate_link = self._wait_for_activation_link(account['username'])
- if activate_link:
- logger.info(f"Activating account: {activate_link}")
- # 在当前浏览器上下文中打开链接,保持环境一致性
- # === 关键步骤:打开新标签页并验证结果 ===
- # 打开新标签页
- activate_tab = self.page.new_tab(activate_link)
-
- try:
- # 等待页面加载并查找 "Activation Successful" 文本
- # timeout=30 表示最多等待 30 秒
- logger.info("Waiting for 'Activation Successful' message on page...")
-
- # DrissionPage 查找包含特定文本的元素
- success_ele = activate_tab.ele('Activation Successful', timeout=30)
-
- if success_ele:
- logger.info(f"✅ Account {account['username']} activated successfully (Verified).")
- return True
- else:
- # 如果没找到成功提示,尝试读取页面内容找错误原因
- body_text = activate_tab.ele('tag:body').text[:200]
- logger.error(f"Activation verification failed. Page text: {body_text}")
- return False
-
- except Exception as e:
- logger.error(f"Error checking activation status: {e}")
- return False
- finally:
- # 无论成功失败,关闭激活标签页,切回主标签
- activate_tab.close()
- else:
- logger.error("Timeout waiting for activation email.")
- else:
- logger.error(f"Registration failed: {resp_data}")
- except Exception as e:
- logger.error(f"Registration process exception: {e}", exc_info=True)
- finally:
- if self.page:
- try:
- self.page.quit()
- except:
- pass
- return False
- # --- 主流程 ---
- def generate_account_details(config):
- """生成账号数据字典"""
- account_prefix = config.get('account_prefix', 'vfs')
- pool_name = config.get('pool_name', 'vfs')
- rand_suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
- username = f"{account_prefix}_{rand_suffix}@{config['email_domain']}.com"
- phone = VFSHelper.generate_mobile_number(config['phone_country_code'])
-
- return {
- 'pool_name': pool_name,
- 'country_code': config['country_code'],
- 'mission_code': config['mission_code'],
- 'phone_country_code': config['phone_country_code'],
- 'phone_number': phone,
- 'username': username,
- 'password': VFSHelper.generate_password(),
- }
- def main():
- # 配置
- config = {
- "pool_name": "ie.hu.booker",
- "account_prefix": "ie_hu",
- "email_domain": "gmail-app",
- "master_email": "visafly666@gmail.com",
- "proxy_url": "http://127.0.0.1:7890",
- "target_count": 10,
- "phone_country_code": 353,
- "country_code": "irl",
- "mission_code": "hun",
- "website": "https://visa.vfsglobal.com/irl/en/hun/register",
- }
-
- bot = VFSRegistrationBot(config)
- success_accounts = []
-
- print(">>> Starting Registration Bot <<<")
-
- while len(success_accounts) < config['target_count']:
- account = generate_account_details(config)
- logger.info(f"Processing Account: {account['username']}")
-
- is_success = bot.register(account)
-
- if is_success:
- success_accounts.append(account)
- logger.info(f"Progress: {len(success_accounts)}/{config['target_count']}")
-
- upload_account_to_server(account)
- # 保存结果到文件,防止中途退出丢失
- with open("registered_accounts.json", "w", encoding='utf-8') as f:
- json.dump(success_accounts, f, indent=4, ensure_ascii=False)
- else:
- logger.warning("Retrying with new account details...")
- # 稍微暂停,避免请求过于频繁
- time.sleep(5)
- print(">>> All tasks completed <<<")
- if __name__ == "__main__":
- main()
|