main.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. # main.py
  2. import time
  3. import os
  4. import json
  5. import logging
  6. # 导入必要模块
  7. from vs_types import GroupConfig, QueryWaitMode, PluginConfig, QueryWaitConfig
  8. from gco import GCO
  9. from vs_log_macros import VSC_INFO, VSC_ERROR
  10. def vfs_test():
  11. # 0. 检查目录结构
  12. if not os.path.exists("plugins/vfs_plugin.py"):
  13. print("[ERROR] 找不到插件文件 'plugins/vfs_plugin.py'")
  14. print("请将上一条回答中的代码保存为该文件。")
  15. return
  16. # 1. 准备 VFS 业务配置 (free_config)
  17. # 这部分 JSON 对应 VfsPlugin 中读取的配置
  18. vfs_config = {
  19. "verbose": 0,
  20. "missionCode": "fra",
  21. "missionName": "France",
  22. "countryCode": "sgp",
  23. "countryName": "Singapore",
  24. "cultureCode": "en-US",
  25. "language": "en",
  26. "website": "https://visa.vfsglobal.com/sgp/en/fra/login",
  27. "appointmentType": [
  28. {
  29. "id": 538,
  30. "routing_key": "slot.vfs.sin.fr.tourist",
  31. "centerName": "France Visa Application Center, Singapore",
  32. "city": "Singapore",
  33. "country": "France",
  34. "visa_type": "Tourist",
  35. "address": "79 Anson Road #15-01 Singapore 079906",
  36. "vacCode": "FRSN",
  37. "categoryName": "Short Stay",
  38. "categoryCode": "02",
  39. "subcategoryName": "Short Stay Tourist, Family Visit",
  40. "subcategoryCode": "Six",
  41. "fee": None,
  42. "currency": None
  43. }
  44. ]
  45. }
  46. # 2. 构造查询策略配置
  47. query_wait_config = QueryWaitConfig(
  48. mode=QueryWaitMode.Random,
  49. random_min=5, # 最小间隔 5秒
  50. random_max=10 # 最大间隔 10秒
  51. )
  52. # 3. 构造插件加载配置
  53. # 注意:plugin_name="vfs_plugin" 会被自动推导为类名 "VfsPlugin"
  54. plugin_config = PluginConfig(
  55. lib_path="plugins",
  56. plugin_name="vfs_plugin",
  57. plugin_bin="vfs_plugin.py",
  58. plugin_proto="IVSPlg"
  59. )
  60. # 4. 构造任务组配置
  61. group_config = GroupConfig(
  62. identifier="Singapore_France_Visas", # 组名
  63. enable=True, # 启用
  64. need_account=True, # 需要登录账号
  65. account_pool="ie_nl", # 使用 AccountManager 中的 fr_pool (在 account_manager.py 中预设)
  66. need_proxy=True, # 需要代理
  67. proxy_pool="global_proxy", # 使用 ProxyManager 中的 global_proxy
  68. need_ip_bind=False, # 不强制 IP 绑定
  69. account_login_interval=10, # 登录失败/成功后锁定账号 10 分钟
  70. target_instances=1, # 启动 2 个并发实例 (线程)
  71. query_wait=query_wait_config,
  72. plugin_config=plugin_config,
  73. free_config=json.dumps(vfs_config) # 将业务配置转为 JSON 字符串
  74. )
  75. # 5. 创建协调器
  76. gco = GCO(group_config)
  77. # 7. 启动
  78. try:
  79. VSC_INFO("main", "========================================")
  80. VSC_INFO("main", " VFS Python Plugin Tester ")
  81. VSC_INFO("main", "========================================")
  82. gco.start()
  83. time.sleep(3600)
  84. except KeyboardInterrupt:
  85. VSC_INFO("main", "Ctrl+C detected. Stopping...")
  86. except Exception as e:
  87. VSC_ERROR("main", "Unexpected Error: %s", str(e))
  88. finally:
  89. # 8. 停止
  90. gco.stop()
  91. VSC_INFO("main", "Program finished.")
  92. def tls_test():
  93. # 0. 检查目录结构
  94. if not os.path.exists("plugins/tls_plugin.py"):
  95. print("[ERROR] 找不到插件文件 'plugins/tls_plugin.py'")
  96. print("请将上一条回答中的代码保存为该文件。")
  97. return
  98. # 1. 准备 VFS 业务配置 (free_config)
  99. # 这部分 JSON 对应 TlsPlugin 中读取的配置
  100. tls_config = {
  101. "verbose": 0,
  102. "embassy_code": "gbLON2fr",
  103. "country_code": "gb",
  104. "mission_code": "fr",
  105. "city": "london",
  106. "capsolver_key": "CAP-5441DD341DD3CC2FAEF0BE6FE493EE9A",
  107. "interest_month": "01-2026",
  108. "target_labels": ["", "pta"],
  109. 'website': 'https://visas-fr.tlscontact.com/country/gb/vac/gbLON2fr/'
  110. }
  111. # 2. 构造查询策略配置
  112. query_wait_config = QueryWaitConfig(
  113. mode=QueryWaitMode.Random,
  114. random_min=60, # 最小间隔 5秒
  115. random_max=300 # 最大间隔 10秒
  116. )
  117. # 3. 构造插件加载配置
  118. # 注意:plugin_name="tls_plugin" 会被自动推导为类名 "TlsPlugin"
  119. plugin_config = PluginConfig(
  120. lib_path="plugins",
  121. plugin_name="tls_plugin",
  122. plugin_bin="tls_plugin.py",
  123. plugin_proto="IVSPlg"
  124. )
  125. # 4. 构造任务组配置
  126. group_config = GroupConfig(
  127. identifier="London_France_Visas", # 组名
  128. enable=True, # 启用
  129. need_account=True, # 需要登录账号
  130. account_pool="gb_fr", # 使用 AccountManager 中的 uk_pool (在 account_manager.py 中预设)
  131. need_proxy=True, # 需要代理
  132. proxy_pool="global_proxy", # 使用 ProxyManager 中的 global_proxy
  133. need_ip_bind=False, # 不强制 IP 绑定
  134. account_login_interval=10, # 登录失败/成功后锁定账号 10 分钟
  135. target_instances=1, # 启动 2 个并发实例 (线程)
  136. query_wait=query_wait_config,
  137. plugin_config=plugin_config,
  138. free_config=json.dumps(tls_config) # 将业务配置转为 JSON 字符串
  139. )
  140. # 5. 创建协调器
  141. gco = GCO(group_config)
  142. # 7. 启动
  143. try:
  144. VSC_INFO("main", "========================================")
  145. VSC_INFO("main", " TLS Python Plugin Tester ")
  146. VSC_INFO("main", "========================================")
  147. gco.start()
  148. time.sleep(3600)
  149. except KeyboardInterrupt:
  150. VSC_INFO("main", "Ctrl+C detected. Stopping...")
  151. except Exception as e:
  152. VSC_ERROR("main", "Unexpected Error: %s", str(e))
  153. finally:
  154. # 8. 停止
  155. gco.stop()
  156. VSC_INFO("main", "Program finished.")
  157. def bls_test():
  158. # 0. 环境检查
  159. if not os.path.exists("plugins/bls_plugin.py"):
  160. print("[ERROR] 找不到插件文件 'plugins/bls_plugin.py'")
  161. return
  162. # 2. 准备 BLS 业务配置 (Free Config)
  163. # 这些字段会被 BlsPlugin 读取并用于填表和逻辑判断
  164. bls_config = {
  165. "domain": "ireland.blsspainglobal.com", # 目标域名
  166. "ocr_service_url": "http://127.0.0.1:8085/predict/vfcode?model=pytorch", # OCR 服务地址
  167. # 签证参数 (根据实际网站下拉框的值填写)
  168. "location": "Dublin",
  169. "jurisdiction": None,
  170. "visaType": "Schengen Visa/ Short Term Visa",
  171. "visaSubType": "Tourist Visa",
  172. "appointmentCategory": "Normal",
  173. # 申请人信息 (用于 book 阶段)
  174. "user_info": {
  175. "first_name": "John",
  176. "last_name": "Doe",
  177. "passport_no": "E12345678",
  178. "passport_issue_date": "2020-01-01",
  179. "passport_expiry_date": "2030-01-01",
  180. "birthday": "1990-01-01",
  181. "phone_no": "447700900000",
  182. "nationality": "India",
  183. "gender": "Male",
  184. "passport_image_url": "https://via.placeholder.com/600x400.jpg" # 模拟图片URL
  185. }
  186. }
  187. # 3. 构造查询策略
  188. query_wait_config = QueryWaitConfig(
  189. mode=QueryWaitMode.Fixed,
  190. fixed_wait=10 # 每次查询间隔 10 秒
  191. )
  192. # 4. 构造插件配置
  193. # plugin_name="bls_plugin" -> 自动推导类名 "BlsPlugin"
  194. plugin_config = PluginConfig(
  195. lib_path="plugins",
  196. plugin_name="bls_plugin",
  197. plugin_bin="bls_plugin.py",
  198. plugin_proto="IVSPlg"
  199. )
  200. # 5. 构造任务组配置
  201. group_config = GroupConfig(
  202. identifier="Dublin_Spain_Visas", # 任务组 ID
  203. enable=True,
  204. need_account=True,
  205. account_pool="ie_es", # 使用刚才注入的池子
  206. need_proxy=True,
  207. proxy_pool="local", # 使用默认代理池
  208. target_instances=1, # 启动 1 个实例
  209. account_login_interval=5, # 登录间隔/锁定时间
  210. query_wait=query_wait_config,
  211. plugin_config=plugin_config,
  212. free_config=json.dumps(bls_config)
  213. )
  214. # 6. 创建协调器
  215. gco = GCO(group_config)
  216. # 8. 启动运行
  217. try:
  218. VSC_INFO("main", ">>> Starting BLS Plugin Test...")
  219. gco.start()
  220. time.sleep(3600)
  221. except KeyboardInterrupt:
  222. VSC_INFO("main", "Ctrl+C detected.")
  223. except Exception as e:
  224. VSC_ERROR("main", f"Unexpected Error: {e}")
  225. finally:
  226. gco.stop()
  227. VSC_INFO("main", "Test Finished.")
  228. if __name__ == "__main__":
  229. vfs_test()