| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- # main.py
- import time
- import os
- import json
- import logging
- # 导入必要模块
- from vs_types import GroupConfig, QueryWaitMode, PluginConfig, QueryWaitConfig
- from gco import GCO
- from vs_log_macros import VSC_INFO, VSC_ERROR
- def vfs_test():
- # 0. 检查目录结构
- if not os.path.exists("plugins/vfs_plugin.py"):
- print("[ERROR] 找不到插件文件 'plugins/vfs_plugin.py'")
- print("请将上一条回答中的代码保存为该文件。")
- return
- # 1. 准备 VFS 业务配置 (free_config)
- # 这部分 JSON 对应 VfsPlugin 中读取的配置
- vfs_config = {
- "verbose": 0,
- "missionCode": "fra",
- "missionName": "France",
- "countryCode": "sgp",
- "countryName": "Singapore",
- "cultureCode": "en-US",
- "language": "en",
- "website": "https://visa.vfsglobal.com/sgp/en/fra/login",
- "appointmentType": [
- {
- "id": 538,
- "routing_key": "slot.vfs.sin.fr.tourist",
- "centerName": "France Visa Application Center, Singapore",
- "city": "Singapore",
- "country": "France",
- "visa_type": "Tourist",
- "address": "79 Anson Road #15-01 Singapore 079906",
- "vacCode": "FRSN",
- "categoryName": "Short Stay",
- "categoryCode": "02",
- "subcategoryName": "Short Stay Tourist, Family Visit",
- "subcategoryCode": "Six",
- "fee": None,
- "currency": None
- }
- ]
- }
- # 2. 构造查询策略配置
- query_wait_config = QueryWaitConfig(
- mode=QueryWaitMode.Random,
- random_min=5, # 最小间隔 5秒
- random_max=10 # 最大间隔 10秒
- )
- # 3. 构造插件加载配置
- # 注意:plugin_name="vfs_plugin" 会被自动推导为类名 "VfsPlugin"
- plugin_config = PluginConfig(
- lib_path="plugins",
- plugin_name="vfs_plugin",
- plugin_bin="vfs_plugin.py",
- plugin_proto="IVSPlg"
- )
-
- # 4. 构造任务组配置
- group_config = GroupConfig(
- identifier="Singapore_France_Visas", # 组名
- enable=True, # 启用
- need_account=True, # 需要登录账号
- account_pool="ie_nl", # 使用 AccountManager 中的 fr_pool (在 account_manager.py 中预设)
- need_proxy=True, # 需要代理
- proxy_pool="global_proxy", # 使用 ProxyManager 中的 global_proxy
- need_ip_bind=False, # 不强制 IP 绑定
- account_login_interval=10, # 登录失败/成功后锁定账号 10 分钟
- target_instances=1, # 启动 2 个并发实例 (线程)
- query_wait=query_wait_config,
- plugin_config=plugin_config,
- free_config=json.dumps(vfs_config) # 将业务配置转为 JSON 字符串
- )
- # 5. 创建协调器
- gco = GCO(group_config)
- # 7. 启动
- try:
- VSC_INFO("main", "========================================")
- VSC_INFO("main", " VFS Python Plugin Tester ")
- VSC_INFO("main", "========================================")
-
- gco.start()
-
- time.sleep(3600)
- except KeyboardInterrupt:
- VSC_INFO("main", "Ctrl+C detected. Stopping...")
- except Exception as e:
- VSC_ERROR("main", "Unexpected Error: %s", str(e))
- finally:
- # 8. 停止
- gco.stop()
- VSC_INFO("main", "Program finished.")
-
-
- def tls_test():
- # 0. 检查目录结构
- if not os.path.exists("plugins/tls_plugin.py"):
- print("[ERROR] 找不到插件文件 'plugins/tls_plugin.py'")
- print("请将上一条回答中的代码保存为该文件。")
- return
- # 1. 准备 VFS 业务配置 (free_config)
- # 这部分 JSON 对应 TlsPlugin 中读取的配置
- tls_config = {
- "verbose": 0,
- "embassy_code": "gbLON2fr",
- "country_code": "gb",
- "mission_code": "fr",
- "city": "london",
- "capsolver_key": "CAP-5441DD341DD3CC2FAEF0BE6FE493EE9A",
- "interest_month": "01-2026",
- "target_labels": ["", "pta"],
- 'website': 'https://visas-fr.tlscontact.com/country/gb/vac/gbLON2fr/'
- }
- # 2. 构造查询策略配置
- query_wait_config = QueryWaitConfig(
- mode=QueryWaitMode.Random,
- random_min=60, # 最小间隔 5秒
- random_max=300 # 最大间隔 10秒
- )
- # 3. 构造插件加载配置
- # 注意:plugin_name="tls_plugin" 会被自动推导为类名 "TlsPlugin"
- plugin_config = PluginConfig(
- lib_path="plugins",
- plugin_name="tls_plugin",
- plugin_bin="tls_plugin.py",
- plugin_proto="IVSPlg"
- )
-
- # 4. 构造任务组配置
- group_config = GroupConfig(
- identifier="London_France_Visas", # 组名
- enable=True, # 启用
- need_account=True, # 需要登录账号
- account_pool="gb_fr", # 使用 AccountManager 中的 uk_pool (在 account_manager.py 中预设)
- need_proxy=True, # 需要代理
- proxy_pool="global_proxy", # 使用 ProxyManager 中的 global_proxy
- need_ip_bind=False, # 不强制 IP 绑定
- account_login_interval=10, # 登录失败/成功后锁定账号 10 分钟
- target_instances=1, # 启动 2 个并发实例 (线程)
- query_wait=query_wait_config,
- plugin_config=plugin_config,
- free_config=json.dumps(tls_config) # 将业务配置转为 JSON 字符串
- )
- # 5. 创建协调器
- gco = GCO(group_config)
- # 7. 启动
- try:
- VSC_INFO("main", "========================================")
- VSC_INFO("main", " TLS Python Plugin Tester ")
- VSC_INFO("main", "========================================")
-
- gco.start()
-
- time.sleep(3600)
- except KeyboardInterrupt:
- VSC_INFO("main", "Ctrl+C detected. Stopping...")
- except Exception as e:
- VSC_ERROR("main", "Unexpected Error: %s", str(e))
- finally:
- # 8. 停止
- gco.stop()
- VSC_INFO("main", "Program finished.")
-
- def bls_test():
- # 0. 环境检查
- if not os.path.exists("plugins/bls_plugin.py"):
- print("[ERROR] 找不到插件文件 'plugins/bls_plugin.py'")
- return
- # 2. 准备 BLS 业务配置 (Free Config)
- # 这些字段会被 BlsPlugin 读取并用于填表和逻辑判断
- bls_config = {
- "domain": "ireland.blsspainglobal.com", # 目标域名
- "ocr_service_url": "http://127.0.0.1:8085/predict/vfcode?model=pytorch", # OCR 服务地址
-
- # 签证参数 (根据实际网站下拉框的值填写)
- "location": "Dublin",
- "jurisdiction": None,
- "visaType": "Schengen Visa/ Short Term Visa",
- "visaSubType": "Tourist Visa",
- "appointmentCategory": "Normal",
-
- # 申请人信息 (用于 book 阶段)
- "user_info": {
- "first_name": "John",
- "last_name": "Doe",
- "passport_no": "E12345678",
- "passport_issue_date": "2020-01-01",
- "passport_expiry_date": "2030-01-01",
- "birthday": "1990-01-01",
- "phone_no": "447700900000",
- "nationality": "India",
- "gender": "Male",
- "passport_image_url": "https://via.placeholder.com/600x400.jpg" # 模拟图片URL
- }
- }
- # 3. 构造查询策略
- query_wait_config = QueryWaitConfig(
- mode=QueryWaitMode.Fixed,
- fixed_wait=10 # 每次查询间隔 10 秒
- )
- # 4. 构造插件配置
- # plugin_name="bls_plugin" -> 自动推导类名 "BlsPlugin"
- plugin_config = PluginConfig(
- lib_path="plugins",
- plugin_name="bls_plugin",
- plugin_bin="bls_plugin.py",
- plugin_proto="IVSPlg"
- )
-
- # 5. 构造任务组配置
- group_config = GroupConfig(
- identifier="Dublin_Spain_Visas", # 任务组 ID
- enable=True,
- need_account=True,
- account_pool="ie_es", # 使用刚才注入的池子
- need_proxy=True,
- proxy_pool="local", # 使用默认代理池
- target_instances=1, # 启动 1 个实例
- account_login_interval=5, # 登录间隔/锁定时间
- query_wait=query_wait_config,
- plugin_config=plugin_config,
- free_config=json.dumps(bls_config)
- )
- # 6. 创建协调器
- gco = GCO(group_config)
- # 8. 启动运行
- try:
- VSC_INFO("main", ">>> Starting BLS Plugin Test...")
- gco.start()
-
- time.sleep(3600)
- except KeyboardInterrupt:
- VSC_INFO("main", "Ctrl+C detected.")
- except Exception as e:
- VSC_ERROR("main", f"Unexpected Error: {e}")
- finally:
- gco.stop()
- VSC_INFO("main", "Test Finished.")
- if __name__ == "__main__":
- vfs_test()
|