import time import json import os import re import uuid import socket import shutil import random import requests import argparse import concurrent.futures import base64 from urllib.parse import urlencode from datetime import datetime, timedelta from typing import Optional, Dict from DrissionPage.common import Keys from DrissionPage import ChromiumPage, ChromiumOptions import configure from utils.cloudflare_bypass_for_scraping import CloudflareBypasser from toolkit.vs_cloud_api import VSCloudApi from toolkit.proxy_tunnel import ProxyTunnel from vs_types import NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError from utils.mouse import HumanMouse from utils.keyboard import HumanKeyboard from utils.scroll import HumanScroll from utils.fingerprint_utils import FingerprintGenerator from toolkit.captcha_breaker import recognize_captcha_with_qwen def load_proxies(pool_name): """从 config/proxies.json 读取对应的代理池""" config_path = os.path.join(os.path.dirname(__file__), 'config', 'proxies.json') try: with open(config_path, 'r', encoding='utf-8') as f: data = json.load(f) proxies = data.get(pool_name, []) if not proxies: raise ValueError(f"代理池 '{pool_name}' 为空或不存在!") return proxies except Exception as e: print(f"读取代理配置文件失败: {e}") exit(1) class FranceVisaRegistrator: def __init__(self, france_visa_url, proxy_config: Optional[Dict]=None, capsolver_key: Optional[str]=None, user_inputs: Optional[Dict]=None): self.proxy_config = proxy_config self.capsolver_key = capsolver_key self.user_inputs = user_inputs # 隔离的用户数据目录 self.instance_id = uuid.uuid4().hex[:8] self.france_visa_url = france_visa_url # self.instance_id = '18d389e9' self.workspace = os.path.abspath(os.path.join("data/temp_browser_data", f"reg_session_{self.instance_id}")) self.page = None self.mouse = None self.keyboard = None # 持有隧道实例 self.tunnel = None def _log(self, msg): now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{now}][TLS-Reg-{self.instance_id}] {msg}") def _get_free_port(self): """获取可用端口,防止 DrissionPage 解析日志报错""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) return s.getsockname()[1] def save_screenshot(self, name_prefix): try: timestamp = int(time.time()) filename = f"{self.instance_id}_{name_prefix}_{timestamp}.jpg" save_path = os.path.join("data", filename) os.makedirs("data", exist_ok=True) self.page.get_screenshot(path=save_path, full_page=False) self._log(f"Screenshot saved to {save_path}") except Exception as e: self._log(f"Failed to save screenshot: {e}") def init_browser(self): """初始化独立、配置好代理的浏览器环境""" self._log("Initializing browser...") co = ChromiumOptions() # 1. 端口与路径隔离 port = self._get_free_port() co.set_local_port(port) co.set_user_data_path(self.workspace) chrome_path = configure.CHROME_PATH if not chrome_path: chrome_path = os.getenv("CHROME_BIN") if chrome_path and os.path.exists(chrome_path): co.set_paths(browser_path=chrome_path) # 2. 代理配置 (支持账号密码) if self.proxy_config and self.proxy_config.get("ip"): p = self.proxy_config if p.get("username") and p.get("password"): self.tunnel = ProxyTunnel(p['ip'], p['port'], p['username'], p['password']) local_proxy = self.tunnel.start() self._log(f"Tunnel started at {local_proxy}") co.set_argument(f'--proxy-server={local_proxy}') else: proxy_str = f"{p.get('proto', 'http')}://{p['ip']}:{p['port']}" co.set_argument(f'--proxy-server={proxy_str}') else: self._log("[WARN] No proxy configured!") fingerprint_gen = FingerprintGenerator() specific_fp = fingerprint_gen.generate(self.instance_id) self._log(f'browser fingerprint={specific_fp}') # 3. 反爬及稳定性配置 co.headless(False) co.set_argument('--no-sandbox') co.set_argument('--lang=en-us') co.set_argument('--accept-lang=en-us') # co.set_argument('--disable-gpu') co.set_argument('--disable-dev-shm-usage') co.set_argument('--window-size=1920,1080') co.set_argument('--disable-blink-features=AutomationControlled') co.set_argument(f"--fingerprint={specific_fp.get('seed')}") co.set_argument(f"--fingerprint-platform={specific_fp.get('platform')}") co.set_argument(f"--fingerprint-brand={specific_fp.get('brand')}") self.page = ChromiumPage(co) self.page.get(self.france_visa_url) time.sleep(5) cf_bypasser = CloudflareBypasser(self.page, log=True) cf_bypasser.bypass(max_retry=8) time.sleep(3) cf_bypasser.handle_waiting_room() self._log("正在初始化拟人化工具...") self.mouse = HumanMouse(self.page, debug=True) self.keyboard = HumanKeyboard(self.page) self._log("随机化鼠标开始位置...") viewport_width = self.page.rect.viewport_size[0] viewport_height = self.page.rect.viewport_size[1] init_x = random.randint(10, viewport_width - 10) init_y = random.randint(10, viewport_height - 10) self.mouse.move(init_x, init_y) def register(self): username = self.user_inputs.get('username') first_name = self.user_inputs.get('first_name') last_name = self.user_inputs.get('last_name') password = f'Visafly@1234' self.page.wait.ele_deleted('tag:h2@@text():Log in to France-Visas', timeout=5) self.page.ele('tag:button@@text():Create an account').click() time.sleep(5) self.page.ele('tag:input@@name=lastName').input(last_name) self.page.ele('tag:input@@name=firstName').input(first_name) self.page.ele('tag:input@@name=email').input(username) self.page.ele('tag:input@@name=emailVerif').input(username) self.page.ele('tag:input@@name=password').input(password) self.page.ele('tag:input@@name=password-confirm').input(password) self.page.ele('tag:select@@name=ddeLanguage').select('English') captcha = self.page.ele('#captchaComponent').ele('tag:img') src = captcha.attr("src") print(src) base64_data = src.split(",")[1] with open("captcha.png", "wb") as f: f.write(base64.b64decode(base64_data)) result = recognize_captcha_with_qwen("captcha.png", "sk-893e895724c6403d81374e515ffaf427") print(f'captcha result={result}') self.page.ele('tag:input@@name=captchaFormulaireExtInput').input(result) self.page.ele("tag:button@@text():Create an account").click() def activate(self, sent_at=None): username = self.user_inputs.get('username') email_box = 'visafly666@gmail.com' sender = 'noreply at interieur.gouv.fr' recipient = username subject_keywords = 'Create your France-Visas account' body_keywords = '' if not sent_at: now_utc = datetime.utcnow() sent_at = now_utc.strftime("%Y-%m-%d %H:%M:%S") content_out = VSCloudApi.Instance().fetch_mail_content( email=email_box, sender=sender, recipient=recipient, subject_keywords=subject_keywords, body_keywords=body_keywords, sent_date=sent_at, expiry=600 ) self._log(f'activate email content={content_out}') match = re.search(r'https://\S+', content_out) activate_link = match.group(0) if match else None self.page.get(activate_link) time.sleep(3) def make_account_useful(self): def fill_date_field(page, selector, date_str): if not date_str: return ele = page.ele(selector) ele.scroll.to_see(center=True) js_detect_format = """ const parts = new Intl.DateTimeFormat().formatToParts(new Date(2023, 11, 31)); let format = []; for (let part of parts) { if (part.type === 'year') format.push('Y'); if (part.type === 'month') format.push('M'); if (part.type === 'day') format.push('D'); } return format; """ date_format = page.run_js(js_detect_format) year, month, day = date_str.split('-') date_dict = { 'Y': year, 'M': month.zfill(2), 'D': day.zfill(2) } ele.click() time.sleep(0.1) page.actions.type(Keys.LEFT * 3) time.sleep(0.1) for i, char in enumerate(date_format): val = date_dict[char] page.actions.type(val) time.sleep(0.1) if char == 'Y': if i < 2: page.actions.type(Keys.RIGHT) time.sleep(0.1) else: pass passport_no = self.user_inputs.get('passport_no') passport_issue_date = self.user_inputs.get('passport_issue_date') passport_expiry_date = self.user_inputs.get('passport_expiry_date') nationality = self.user_inputs.get('nationality') passport_issue_from = self.user_inputs.get('passport_issue_from') self.page.ele('#formHeader:navigationLanguage_input').select('English') time.sleep(3) self.page.ele('#formAccueilUsager:ajouterGroupe').click() time.sleep(5) self.page.ele('#formStep1:visas-selected-nationality_input').select(nationality) self.page.ele('#formStep1:Visas-selected-deposit-country_input').select('Ireland') self.page.ele('#formStep1:Visas-selected-stayDuration_input').select('Short-stay (≤ 90 days)') self.page.ele('#formStep1:Visas-selected-destination_input').select('France') self.page.ele('#formStep1:Visas-selected-deposit-town_input').select('Dublin') self.page.ele('#formStep1:Visas-selected-authority_input').select(passport_issue_from) self.page.ele('#formStep1:Visas-dde-travel-document_input').select('Ordinary passport') self.page.ele('#formStep1:Visas-dde-travel-document-number').input(passport_no) fill_date_field(self.page, '#formStep1:Visas-dde-release_date_real_input', passport_issue_date) fill_date_field(self.page, '#formStep1:Visas-dde-expiration_date_input', passport_expiry_date) self.page.ele('#formStep1:Visas-selected-purposeCategory_input').select('Tourism') self.page.ele('#formStep1:Visas-selected-purpose_input').select('Tourism / Private visit') self.page.ele('#formStep1:btnVerifier').click() time.sleep(3) self.page.ele('#formStep1:btnSuivant').click() time.sleep(3) self.page.ele('#formStep1:btnValiderModal').click() time.sleep(3) self.page.ele('.iconeDDEIdPanel').click() time.sleep(0.5) self.page.ele('text():My applications').click() time.sleep(3) html_content = self.page.html match = re.search(r'FRA1[A-Z0-9]+', html_content) if not match: raise BizLogicError(message='FRA1 not found') fra_number = match.group(0) print(fra_number) return fra_number def cleanup(self): """清理浏览器进程和缓存文件夹""" self._log("Cleaning up resources...") if self.page: try: self.page.quit() except: pass if os.path.exists(self.workspace): time.sleep(1) # 等待文件锁释放 shutil.rmtree(self.workspace, ignore_errors=True) def main(): france_visa_url = 'https://application-form.france-visas.gouv.fr/fv-fo-dde/' proxy_config = { 'ip': '127.0.0.1', 'port': 7890, 'username': '', 'password': '' } capsolver_key = '' user_inputs = { "username": "ManaliAshokGaikwad26@gmail-app.com", "first_name": "Manali Ashok", "last_name": "Gaikwad", "nationality": "Indian", "passport_issue_from": "India", "passport_no": "Z4413123", "passport_issue_date": "2018-01-15", "passport_expiry_date": "2028-01-14", } bot = FranceVisaRegistrator( france_visa_url, proxy_config=proxy_config, capsolver_key=capsolver_key, user_inputs=user_inputs ) bot.init_browser() now_utc = datetime.utcnow() sent_at = now_utc.strftime("%Y-%m-%d %H:%M:%S") bot.register() bot.activate(sent_at=sent_at) bot.make_account_useful() if __name__ == "__main__": main()