| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463 |
- import re
- import base64
- import time
- import json
- import random
- import string
- from urllib.parse import urlparse, parse_qs, urlencode
- from typing import Dict, List, Optional, Any
- try:
- from curl_cffi import requests, const
- from bs4 import BeautifulSoup
- except ImportError:
- raise ImportError("Missing dependencies. Run: pip install curl-cffi beautifulsoup4")
- # 框架依赖
- from vs_plg import IVSPlg, VSError # type: ignore
- from vs_types import VSPlgConfig, VSQueryResult, VSBookResult, AvailabilityStatus # type: ignore
- from vs_log_macros import VSC_INFO, VSC_ERROR, VSC_DEBUG, VSC_WARN # type: ignore
- from toolkit.vs_cloud_api import VSCloudApi # type: ignore
- class BlsPlugin(IVSPlg):
- """
- BLS 签证预约插件 (精简版)
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.session: Optional[requests.Session] = None
-
- # 运行时状态
- self.book_params: Dict = {}
- self.last_error = VSError(0, "OK")
- self.is_healthy = True
-
- # OCR 服务地址默认值
- self.ocr_service_url = "http://127.0.0.1:8085/predict/vfcode"
- def get_group_id(self) -> str:
- return self.group_id
- def set_config(self, config: VSPlgConfig):
- self.config = config
- try:
- self.free_config = json.loads(config.free_config) if config.free_config else {}
- except:
- self.free_config = {}
-
- # 从配置中读取 OCR 服务地址,如果没有则使用默认
- if self.free_config.get("ocr_service_url"):
- self.ocr_service_url = self.free_config["ocr_service_url"]
- def health_check(self) -> bool:
- return self.is_healthy
- def get_last_error(self) -> VSError:
- return self.last_error
- def _set_error(self, code: int, message: str):
- self.last_error = VSError(code, message)
- VSC_ERROR("bls_plg", "[%s] Error %d: %s", self.group_id, code, message)
- if code in [401, 403]: self.is_healthy = False
- # =========================================================================
- # 1. 登录流程 (Login)
- # =========================================================================
- def create_session(self) -> bool:
- VSC_INFO("bls_plg", "[%s] Creating session...", self.group_id)
- self.is_healthy = True
-
- # 初始化 Session
- self.session = requests.Session(
- proxy=self._get_proxy_url(),
- impersonate="chrome131",
- curl_options={const.CurlOpt.MAXAGE_CONN: 1800, const.CurlOpt.VERBOSE: False}
- )
- domain = self.free_config.get("domain")
- if not domain: return False
- # 1.1 获取登录页 & 解析参数
- url = f"https://{domain}/Global/account/login"
- resp = self._request("GET", url)
- if not resp: return False
-
- soup = BeautifulSoup(resp.text, 'html.parser')
- form_data = self._extract_hidden_fields(soup)
-
- # 解析动态 ID (UserId1, Password1 等)
- for inp in soup.find_all('input'):
- iid = inp.get('id', '')
- if 'UserId' in iid and re.search(r'\d+', iid):
- form_data["UserIdKey"] = iid # 暂存 Key
- form_data["UserId"] = re.search(r'\d+', iid).group(0)
- if 'Password' in iid and re.search(r'\d+', iid):
- form_data["PasswordKey"] = iid # 暂存 Key
- form_data["Password"] = re.search(r'\d+', iid).group(0)
-
- # 解析 data 参数 (用于验证码)
- data_val = self._extract_js_var(resp.text, "iframeOpenUrl", r"data=([^&]+)")
-
- # 1.2 处理验证码
- captcha_token = self._solve_bls_captcha(data_val, 'Global/account/login')
- if not captcha_token: return False
-
- # 1.3 提交登录
- submit_url = f"https://{domain}/Global/account/loginsubmit"
- payload = form_data
- payload["X-Requested-With"] = "XMLHttpRequest"
- payload["CaptchaData"] = captcha_token
- # 填入账号密码
- if "UserIdKey" in form_data: payload[form_data["UserIdKey"]] = self.config.account.username
- if "PasswordKey" in form_data: payload[form_data["PasswordKey"]] = self.config.account.password
-
- login_res = self._request("POST", submit_url, data=payload, headers={"Referer": url})
- if login_res and login_res.json().get('success'):
- VSC_INFO("bls_plg", "[%s] Login Successful", self.group_id)
- return True
-
- self._set_error(2000, "Login Failed")
- return False
- # =========================================================================
- # 2. 查询流程 (Query)
- # =========================================================================
- def query(self) -> VSQueryResult:
- res = VSQueryResult()
- domain = self.free_config.get("domain")
- if not self.session: return res
- # 2.1 签证类型验证 (VisaTypeVerification)
- url_vtv = f"https://{domain}/Global/bls/visatypeverification"
- resp = self._request("GET", url_vtv)
- if not resp: return res
-
- form_vtv = self._extract_hidden_fields(BeautifulSoup(resp.text, 'html.parser'))
- captcha_token = self._solve_bls_captcha(referer='Global/bls/visatypeverification')
- if not captcha_token: return res
-
- form_vtv['CaptchaData'] = captcha_token
- form_vtv["X-Requested-With"] = "XMLHttpRequest"
-
- vtv_res = self._request("POST", f"https://{domain}/Global/bls/VisaTypeVerification", data=form_vtv, headers={"Referer": url_vtv})
- if not vtv_res or not vtv_res.json().get('success'): return res
-
- # 2.2 签证类型选择 (VisaType)
- return_url = vtv_res.json()['returnUrl'] # 包含 data=xxx
- data_val = re.search(r"data=([^&]+)", return_url).group(1)
-
- url_vt = f"https://{domain}/Global/bls/visatype?data={data_val}"
- resp_vt = self._request("GET", url_vt)
- if not resp_vt: return res
-
- # 这里需要极其复杂的 JS 变量提取 (JS Arrays -> Match Name -> Get ID)
- # 为了缩减篇幅,假设 _construct_visatype_payload 封装了这些逻辑
- vt_payload = self._construct_visatype_payload(resp_vt.text, BeautifulSoup(resp_vt.text, 'html.parser'))
- if not vt_payload: return res
-
- vt_res = self._request("POST", f"https://{domain}/Global/bls/VisaType", data=vt_payload, headers={"Referer": url_vt})
- if not vt_res or not vt_res.json().get('success'):
- if vt_res and not vt_res.json().get('available'):
- res.success = True
- res.availability_status = AvailabilityStatus.NoneAvailable
- return res
- # 2.3 获取预约参数 (Book Params)
- final_url = vt_res.json()['returnUrl']
- q_params = parse_qs(urlparse(final_url).query)
- self.book_params = {k: v[0] for k, v in q_params.items()}
-
- # 2.4 查询日历 (ManageAppointment)
- url_ma = f"https://{domain}/Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}"
- resp_ma = self._request("GET", url_ma)
- if not resp_ma: return res
-
- avail_str = self._extract_js_var(resp_ma.text, "var availDates", r"var availDates =(.*?);")
- if avail_str:
- avail_json = json.loads(avail_str)
- # 提取日期
- dates = [x['DateText'] for x in avail_json['ad'] if x['SingleSlotAvailable']]
-
- if dates:
- res.success = True
- res.availability_status = AvailabilityStatus.Available
- res.earliest_date = dates[0]
- for d in dates:
- da = VSQueryResult.DateAvailability(date=d)
- da.times.append(VSQueryResult.DateAvailability.TimeSlot(time="00:00", label="Available"))
- res.availability.append(da)
- else:
- res.success = True
- res.availability_status = AvailabilityStatus.NoneAvailable
-
- return res
- # =========================================================================
- # 3. 预约流程 (Book)
- # =========================================================================
- def book(self, slot_info: VSQueryResult) -> VSBookResult:
- res = VSBookResult()
- domain = self.free_config.get("domain")
- if not self.book_params: return res
-
- uinfo = self.free_config.get("user_info", {})
-
- # 3.1 获取 Manage Page (为了 Token 和 JS 变量)
- url_ma = f"https://{domain}/Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}"
- resp_ma = self._request("GET", url_ma)
- if not resp_ma: return res
- ma_soup = BeautifulSoup(resp_ma.text, 'html.parser')
- ma_form = self._extract_hidden_fields(ma_soup)
- req_token = ma_form.get('__RequestVerificationToken')
-
- # 3.2 上传照片
- if 'passport_image_url' in uinfo:
- photo_bytes = requests.get(uinfo['passport_image_url']).content
- boundary = "----WebKitFormBoundary" + "".join(random.choices(string.ascii_letters + string.digits, k=16))
- upload_headers = {
- "content-type": f"multipart/form-data; boundary={boundary}",
- "requestverificationtoken": req_token,
- "x-requested-with": "XMLHttpRequest",
- "Referer": url_ma
- }
- body = (f"--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"photo.jpg\"\r\n"
- f"Content-Type: image/jpeg\r\n\r\n").encode("utf-8") + photo_bytes + f"\r\n--{boundary}--\r\n".encode("utf-8")
-
- up_res = self.session.post(f"https://{domain}/Global/query/UploadProfileImage", headers=upload_headers, data=body)
- if up_res.status_code == 200:
- ma_form['ApplicantPhotoId'] = up_res.json()['fileId']
- # 3.3 邮箱 OTP 流程
- data_val = self._extract_js_var(resp_ma.text, "win.iframeOpenUrl", r"data=([^&]+)")
- # 发送 OTP
- self._request("GET", f"https://{domain}/Global/blsappointment/SendAppointmentVerificationCode?code={data_val}", headers={"Referer": url_ma, "X-Requested-With": "XMLHttpRequest"})
-
- # 读取 OTP (Wait 30s max)
- otp_code = self._read_otp_email(wait_sec=30)
- if not otp_code:
- self._set_error(3004, "OTP timeout")
- return res
-
- # 验证 OTP
- verify_payload = {"Code": otp_code, "Value": ma_form.get('EmailCode'), "Id": ma_form.get('Id')}
- v_res = self._request("POST", f"https://{domain}/Global/blsappointment/VerifyEmail", data=verify_payload, headers={"Referer": url_ma, "requestverificationtoken": req_token})
- if not v_res or not v_res.json().get('success'): return res
-
- ma_form['EmailVerified'] = 'True'
- ma_form['EmailVerificationCode'] = otp_code
- # 3.4 锁定时间 (简单随机)
- target_date = slot_info.earliest_date
- # Query Slots in Day
- slot_url = f"https://{domain}/Global/blsappointment/GetAvailableSlotsByDate"
- # 构造复杂的 query params... 省略部分非关键参数
- slot_params = {
- "appointmentDate": target_date,
- "locationId": ma_form.get("LocationId"),
- "categoryId": ma_form.get("AppointmentCategoryId"),
- "visaType": ma_form.get("VisaType"),
- "visaSubType": ma_form.get("VisaSubTypeId"),
- "applicantCount": 1,
- "dataSource": ma_form.get("DataSource"),
- "missionId": ma_form.get("MissionId")
- }
- slots_res = self._request("POST", slot_url, params=slot_params, headers={"Referer": url_ma, "requestverificationtoken": req_token})
- if not slots_res: return res
-
- slots_data = sorted(slots_res.json(), key=lambda x: -x["Count"]) # 选剩余最多的
- if not slots_data or slots_data[0]['Count'] <= 0: return res
-
- target_time = slots_data[0]['Name']
- ma_form['ServerAppointmentDate'] = target_date
- ma_form['AppointmentDetailsList'] = '[]'
- # 这里的 key 是动态的 ID,需重新解析 ID
- date_id = re.search(r'AppointmentDate(\d+)', str(ma_soup)).group(1)
- slot_id = re.search(r'AppointmentSlot(\d+)', str(ma_soup)).group(1)
- ma_form[f'AppointmentDate{date_id}'] = target_date
- ma_form[f'AppointmentSlot{slot_id}'] = target_time
- # 3.5 再次验证码 & 提交 ManageAppointment
- captcha_token = self._solve_bls_captcha(data_val, f'Global/blsAppointment/ManageAppointment?{urlencode(self.book_params)}')
- if not captcha_token: return res
- ma_form['CaptchaData'] = captcha_token
-
- final_ma_res = self._request("POST", f"https://{domain}/Global/BLSAppointment/ManageAppointment", data=ma_form, headers={"Referer": url_ma})
- if not final_ma_res: return res
-
- appt_model_id = final_ma_res.json().get('model', {}).get('Id')
- if not appt_model_id: return res
- # 3.6 填写申请表 (VisaAppointmentForm)
- # 获取页面 -> 解析 JS 变量 -> 映射 UserInfo -> 提交
- # 这里逻辑较深,核心是映射。简化为提交一个空的 applicants JSON,实际需完整映射。
- # 假设 _fill_applicant_form 做了这些工作
- if self._submit_final_form(appt_model_id, uinfo, self.book_params, req_token):
- # 成功,返回 Liveness 链接
- res.success = True
- res.session_id = self._generate_id()
- res.order_id = res.session_id
- res.payment_link = f"https://{domain}/Global/BlsAppointment/livenessView?id={appt_model_id}"
-
- # 将 Session 信息存入 Cloud 以便前端接管
- self._save_session_to_cloud(res.session_id, res.payment_link)
- VSC_INFO("bls_plg", "[%s] Book Success. Liveness URL: %s", self.group_id, res.payment_link)
-
- return res
- # =========================================================================
- # 辅助函数 (Helpers)
- # =========================================================================
- def _request(self, method, url, **kwargs):
- print(f'_request {method} {url}')
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
- }
- if 'headers' in kwargs: headers.update(kwargs['headers'])
- kwargs['headers'] = headers
-
- try:
- resp = self.session.request(method, url, timeout=60, **kwargs)
- if resp.status_code == 401: self._set_error(401, "Unauthorized")
- elif resp.status_code in [403, 429]: self._set_error(resp.status_code, "Blocked")
- if resp.status_code == 200: return resp
- except Exception as e:
- VSC_WARN("bls_plg", f"Request Error: {e}")
- return None
- def _solve_bls_captcha(self, data='', referer='') -> Optional[str]:
- """
- 验证码处理:获取图片 -> 调用远程 OCR 服务 -> 提交验证
- """
- domain = self.free_config.get("domain")
- url = f"https://{domain}/Global/NewCaptcha/GenerateCaptcha"
- if data: url = f"https://{domain}/Global/CaptchaPublic/GenerateCaptcha?data={data}"
-
- resp = self._request("GET", url, headers={"Referer": f"https://{domain}/{referer}"})
- if not resp: return None
-
- soup = BeautifulSoup(resp.text, 'html.parser')
-
- # 1. 提取目标数字
- target_text = soup.get_text()
- target_match = re.search(r'Select\s*(\d+)', target_text)
- if not target_match: return None
- target_num = target_match.group(1)
-
- selected_ids = []
-
- # 2. 遍历图片并调用远程 OCR
- imgs = soup.find_all('img', class_='captcha-img')
- for img in imgs:
- src = img.get('src', '')
- if 'base64,' in src:
- b64 = src.split('base64,')[1]
- img_bytes = base64.b64decode(b64)
-
- # === 远程调用 Start ===
- try:
- # 直接发送二进制 body
- ocr_resp = requests.post(
- self.ocr_service_url,
- data=img_bytes,
- headers={"Content-Type": "application/octet-stream"},
- timeout=5
- )
- if ocr_resp.status_code == 200:
- res_json = ocr_resp.json()
- ocr_res = res_json.get('data', '').replace('$', '')
-
- VSC_DEBUG("bls_plg", f"OCR: {ocr_res} (Target: {target_num})")
-
- if ocr_res == target_num:
- selected_ids.append(img.get('id'))
- except Exception as e:
- VSC_WARN("bls_plg", f"OCR Service Failed: {e}")
- # === 远程调用 End ===
-
- if not selected_ids: return None
-
- # 3. 提交选中结果
- form = self._extract_hidden_fields(soup)
- form['SelectedImages'] = ",".join(selected_ids)
- submit_url = f"https://{domain}/Global/{'CaptchaPublic' if data else 'NewCaptcha'}/SubmitCaptcha"
-
- res = self._request("POST", submit_url, data=form, headers={"X-Requested-With": "XMLHttpRequest", "Referer": url})
- if res and res.json().get('captcha'):
- return res.json()['captcha']
- return None
- def _extract_hidden_fields(self, soup) -> Dict:
- params = {}
- form = soup.find("form")
- if form:
- for inp in form.find_all("input"):
- name = inp.get("name")
- if name: params[name] = inp.get("value", "")
- return params
- def _extract_js_var(self, html, context, pattern):
- # 简单正则提取
- if context in html:
- match = re.search(pattern, html)
- if match: return match.group(1)
- return ""
- def _construct_visatype_payload(self, html, soup):
- # 简化版:提取 ID 逻辑。实际需根据 free_config 的 VisaType 名称匹配 JS 数组中的 ID
- # 这里仅展示结构,核心是利用 self.free_config['visaType'] 等去匹配
- params = self._extract_hidden_fields(soup)
-
- # Helper inner function to find ID from JS array
- def find_id(var_name, target_name, key="Name", val_key="Id"):
- json_str = self._extract_js_var(html, f"var {var_name}", rf"var {var_name}\s*=\s*(.*?);")
- if json_str:
- try:
- data = json.loads(json_str)
- for item in data:
- if item.get(key) == target_name: return item.get(val_key)
- except: pass
- return None
- # 示例:Jurisdiction
- if self.free_config.get('jurisdiction'):
- jid = find_id("jurisdictionData", self.free_config['jurisdiction'])
- if jid: params[f'JurisdictionId{jid}'] = jid # 这里的 Key 也是动态的,BLS 特色
-
- # ... 对 Location, VisaType, VisaSubType 重复此逻辑 ...
-
- params["X-Requested-With"] = "XMLHttpRequest"
- params["ResponseData"] = "[]" # 必须字段
- return params
- def _submit_final_form(self, model_id, uinfo, book_params, token):
- # 1. Get Form HTML -> 2. Parse JS Data -> 3. Map uinfo -> 4. Post
- # 略,参考原代码 parse_application_form_excel 和 _fix_applicant_data
- # 这是一个纯数据映射过程
- return True
- def _read_otp_email(self, wait_sec=30):
- # 轮询 Cloud API
- for _ in range(wait_sec // 5):
- time.sleep(5)
- # content = VSCloudApi.Instance().fetch_mail_content(...)
- # ...
- pass
- return "123456" # Mock
- def _save_session_to_cloud(self, sid, url):
- cookies = json.dumps(requests.utils.dict_from_cookiejar(self.session.cookies))
- VSCloudApi.Instance().create_http_session(sid, cookies, "", "", "", url, {})
- def _get_proxy_url(self):
- p = self.config.proxy
- if not p.ip: return ""
- if p.username: return f"{p.scheme}://{p.username}:{p.password}@{p.ip}:{p.port}"
- return f"{p.scheme}://{p.ip}:{p.port}"
- def _generate_id(self):
- return "".join(random.choices(string.ascii_letters + string.digits, k=8))
|