| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623 |
- import time
- import json
- import random
- import re
- import os
- from datetime import datetime, timezone, timedelta
- from typing import List, Dict, Any, Callable, Optional, Set, Tuple
- from curl_cffi import requests, const
- from vs_plg import IVSPlg
- from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError
- MODERN_BROWSERS: List[str] = [
- "chrome",
- "edge",
- "safari",
- "safari_ios",
- "chrome_android",
- "firefox",
- ]
- class GrcPlugin(IVSPlg):
- """
- https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas
- 签证预约插件
- 适配爱尔兰希腊签证 (GR) 流程
- """
- def __init__(self, group_id: str):
- self.group_id = group_id
- self.config: Optional[VSPlgConfig] = None
- self.free_config: Dict[str, Any] = {}
- self.is_healthy = True
- self.logger = None
- self.session: Optional[requests.Session] = None
- self.resource_id = '1123832'
- self.rp_id = None
- self.token = None
- self.session_create_time: float = 0
- def get_group_id(self) -> str:
- return self.group_id
-
- def set_log(self, logger: Callable[[str], None]) -> None:
- self.logger = logger
- def set_config(self, config: VSPlgConfig):
- self.config = config
- self.free_config = config.free_config or {}
-
- def keep_alive(self):
- pass
- def health_check(self) -> bool:
- if not self.is_healthy:
- return False
- if self.session is None:
- return False
- if self.config.session_max_life > 0:
- current_time = time.time()
- elapsed_time = current_time - self.session_create_time
- if elapsed_time > self.config.session_max_life:
- self._log(f"Session Life ({int(elapsed_time)}s) out of max life limit ({self.config.session_max_life * 60}s), mark as unhealth session")
- return False
- return True
- def create_session(self):
- curlopt = {
- const.CurlOpt.MAXAGE_CONN: 1800,
- const.CurlOpt.MAXLIFETIME_CONN: 1800,
- const.CurlOpt.VERBOSE: self.config.debug,
- }
-
- chosen_browser = random.choice(MODERN_BROWSERS)
- self._log(f"Using browser fingerprint: {chosen_browser}")
- self.session = requests.Session(
- proxy=self._get_proxy_url(),
- impersonate=chosen_browser,
- curl_options=curlopt,
- use_thread_local_curl=False,
- http_version=const.CurlHttpVersion.V2TLS
- )
- login_url = "https://www.supersaas.com/schedule/login/GreekEmbassyInDublin/Visas"
- self._perform_request("GET", login_url)
- headers = {
- "Referer": login_url,
- "Origin": "https://www.supersaas.com",
- "Content-Type": "application/x-www-form-urlencoded"
- }
-
- data = {
- "name": self.config.account.username,
- "password": self.config.account.password,
- "remember": "K",
- "cookie_fix": "1",
- "button": ""
- }
-
- resp = self._perform_request('POST', login_url, headers=headers, data=data)
-
- if "Sign out" in resp.text or "Signed in as" in resp.text:
- if "reached the maximum number" in resp.text or "You cannot create new reservations" in resp.text:
- self.is_healthy = False
- self._save_debug_html(resp.text, prefix='login_quota_exceeded')
- self._log(f"Login failed: Account '{self.config.account.username}' has reached the maximum number of reservations.")
- raise BizLogicError(message='Login failed: Account has reached the maximum number of reservations.')
- self.session_create_time = time.time()
- self._log(f"Session created successfully. (User: {self.config.account.username})")
-
- elif "Invalid email or password" in resp.text:
- self._save_debug_html(resp.text, prefix='login_auth_fail')
- raise BizLogicError(message='Login failed: Invalid email or password')
-
- else:
- self._save_debug_html(resp.text, prefix='login_unknown_fail')
- self._log(f"Login check failed. Current URL: {resp.url}")
- raise BizLogicError(message='Login failed: Unknown response')
-
- def _extract_int(self, pattern: str, text: str, default: int = 0) -> int:
- """辅助正则提取数字的函数"""
- match = re.search(pattern, text)
- if match:
- return int(match.group(1))
- return default
-
- def _get_daily_bounds_and_breaks(self, open_times: list, bit_prefs: int, date_ts: int, js_day_index: int) -> Tuple[Optional[int], Optional[int], Optional[Tuple[int, int]]]:
- """完美还原JS逻辑,解析每天的营业起始时间,并提取隐藏的午休时间"""
- if not open_times:
- return None, None, None
- # =========================================================
- # 【终极拦截器】:解析 bit_prefs 位掩码!
- # 最低的 7 位控制着周日(0)到周六(6)的物理营业开关。
- # 如果为 0,这天就是雷打不动的休息日,直接返回空!
- # =========================================================
- if bit_prefs > 0:
- if not (bit_prefs & (1 << js_day_index)):
- return None, None, None
- def safe_get(idx):
- if idx < len(open_times):
- return open_times[idx]
- return None
- # [0-6] 是周日到周六的每天开始时间(分), [7-13] 是结束时间(分)
- start_min = safe_get(js_day_index)
- end_min = safe_get(js_day_index + 7)
- # Null 意味着当天不营业
- if start_min is None or end_min is None or start_min >= end_min:
- return None, None, None
- # JS 中的 day_base_ts = 严格的当天 00:00:00 (根据绝对时间戳截断)
- day_base_ts = date_ts - (date_ts % 86400)
-
- start_ts = day_base_ts + start_min * 60
- end_ts = day_base_ts + end_min * 60
-
- # [14-20] 是午休开始时间, [21-27] 是午休结束时间
- break_block = None
- break_start_min = safe_get(js_day_index + 14)
- break_end_min = safe_get(js_day_index + 21)
-
- if break_start_min is not None and break_end_min is not None:
- b_start = day_base_ts + break_start_min * 60
- b_end = day_base_ts + break_end_min * 60
- break_block = (b_start, b_end)
- return start_ts, end_ts, break_block
-
- def _fetch_schedule_data(self, start_dt: datetime, days: int) -> Dict:
- """发送一次 AJAX 请求,获取指定时间范围内的所有数据"""
- afrom_str = start_dt.strftime("%Y-%m-%d")
- ato_dt = start_dt + timedelta(days=days)
- ato_str = ato_dt.strftime("%Y-%m-%d 00:00")
-
- url = f"https://www.supersaas.com/ajax/resource/{self.rp_id}"
-
- params = {
- "token": self.token,
- "afrom": afrom_str,
- "ato": ato_str,
- "ad": "r",
- "efrom": afrom_str,
- "eto": ato_dt.strftime("%Y-%m-%d"),
- }
-
- headers = {
- "Accept": "*/*",
- "Referer": "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas",
- "X-Requested-With": "XMLHttpRequest"
- }
-
- self._log(f"Fetching data from {afrom_str} to {ato_dt.strftime('%Y-%m-%d')}...")
- resp = self._perform_request("GET", url, params=params, headers=headers)
- return resp.json()
- def query(self, apt_type: AppointmentType) -> VSQueryResult:
- res = VSQueryResult()
- res.success = False
-
- url = "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas"
- headers = {
- "Referer": "https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas",
- "Origin": "https://www.supersaas.com",
- }
- resp = self._perform_request("GET", url, headers=headers)
- if 'Log into Visas schedule' in resp.text:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError(message='Session expired.')
-
- # 1. 提取核心会话参数
- self.resource_id = str(self._extract_int(r'resource\[(\d+)\]\s*=', resp.text))
- self.rp_id = str(self._extract_int(r'rp_id=(\d+)', resp.text))
- self.token = str(self._extract_int(r'token=(\d+)', resp.text))
- if not self.rp_id or not self.token or self.resource_id == '0':
- self._log("Failed to extract rp_id or token from HTML")
- raise NotFoundError(message='rp_id or token not found')
- # 2. 提取并还原所有的 JS 限制变量
- default_length = self._extract_int(r'default_length\s*=\s*(\d+)', resp.text, 3600)
- rounding = self._extract_int(r'rounding\s*=\s*(\d+)', resp.text, 3600)
- buffer_time = self._extract_int(r'buffer\s*=\s*(\d+)', resp.text, 0)
- add_limit = self._extract_int(r'add_limit\s*=\s*(\d+)', resp.text, 0)
- early_limit = self._extract_int(r'early_limit\s*=\s*(\d+)', resp.text, 0)
- early_snap = self._extract_int(r'early_snap\s*=\s*(\d+)', resp.text, 0)
- bit_prefs = self._extract_int(r'bit_prefs\s*=\s*(\d+)', resp.text, 0)
- # 3. 提取 open_times
- open_times = []
- ot_match = re.search(r'open_times\s*=\s*\[(.*?)\]', resp.text)
- if ot_match:
- # 巧妙利用 json.loads 处理包含 null 的 JS 数组字符串
- open_times_str = f"[{ot_match.group(1)}]"
- open_times = json.loads(open_times_str)
-
- # 4. 提取放号排期 (Season)
- season_start_ts = 0
- season_end_ts = float('inf')
- season_match = re.search(r'season\s*=\s*\[(\d+),\s*(\d+)\]', resp.text)
- if season_match:
- season_start_ts = int(season_match.group(1))
- season_end_ts = int(season_match.group(2))
-
- # 5. 提取全局页面屏蔽的假日例外期 (ecache)
- ecache_blocks = []
- ecache_match = re.search(r'ecache\s*=\s*\{data:\s*\[(.*?)\]\}', resp.text)
- if ecache_match:
- triplets = re.findall(r'\[(\d+),\s*(\d+),\s*\d+\]', ecache_match.group(1))
- for t0, t1 in triplets:
- ecache_blocks.append((int(t0), int(t1)))
- scan_start_dt = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
- chunk_start_ts_initial = int(scan_start_dt.timestamp())
-
- days_total_scan = 60
- chunk_size = 30
- valid_slots_map: Dict[datetime.date, List[TimeSlot]] = {}
- # Chunk 循环
- for i in range(0, days_total_scan, chunk_size):
- chunk_start_dt = scan_start_dt + timedelta(days=i)
- json_data = self._fetch_schedule_data(chunk_start_dt, chunk_size)
-
- # 把已经预定的订单、节假日、以及Gcal双向同步块统一收集为“阻挡物实体墙”
- all_blocks: List[Tuple[int, int]] = ecache_blocks.copy()
- for key in ['app', 'exc', 'gcal']:
- if key in json_data:
- for item in json_data[key]:
- all_blocks.append((int(item[0]), int(item[1])))
- for day_offset in range(chunk_size):
- # 这里必须使用纯净的绝对 UNIX 时间进行加法,避免夏令时导致时间漂移
- current_day_ts = int(chunk_start_dt.timestamp()) + day_offset * 86400
-
- # 计算出当前天对应的 JS Weekday (0=Sun, 1=Mon ... 6=Sat)
- js_day_index = int((4 + (current_day_ts // 86400)) % 7)
-
- # 这里传入了 bit_prefs,彻底过滤掉伪装成开门的休息日!
- start_ts, end_ts, break_block = self._get_daily_bounds_and_breaks(open_times, bit_prefs, current_day_ts, js_day_index)
- if start_ts is None:
- continue
- # 每天专属的围墙:加入当天的午休时间
- daily_blocks = all_blocks.copy()
- if break_block:
- daily_blocks.append(break_block)
- curr_ts = start_ts
- current_time_ts = int(time.time())
-
- # --- 完全还原 JS 的 limits 以及 bit_prefs (early_snap) 机制 ---
- min_bookable_ts = current_time_ts + add_limit + 90
- max_bookable_ts = float('inf')
-
- if early_limit > 0:
- limit_ts = current_time_ts + early_limit
- if early_snap:
- k = limit_ts % 86400
- y = int((4 + (limit_ts // 86400)) % 7)
- # 这里正是 bit_prefs 的用武之地:检测位掩码
- if (bit_prefs & (1 << y)) and open_times[y] is not None and open_times[y] <= (k / 60):
- limit_ts += (86400 - k)
- max_bookable_ts = limit_ts + default_length
-
- # ================= 核心碰撞与吸附算法 =================
- while curr_ts + default_length <= end_ts:
-
- # 1. 不能早于排期放号的绝对起始时间
- if curr_ts < season_start_ts:
- curr_ts = season_start_ts
- continue
-
- # 2. 如果撞到了排期尽头,或超过了后台设置的最远可预约天数,当天循环直接结束
- if curr_ts >= season_end_ts or curr_ts >= max_bookable_ts:
- break
-
- # 3. 拦截提前量 (比如不允许预约1小时内的票)
- if curr_ts < min_bookable_ts:
- curr_ts = min_bookable_ts
- continue
- # 4. 【核心机制 1】:完美网格吸附 (Rounding Snapping)
- # 动态适配 10分、15分、30分、60分的网格,废弃单纯整点逻辑
- rem = curr_ts % rounding
- if rem > 0:
- curr_ts += (rounding - rem)
- continue
-
- slot_end = curr_ts + default_length
-
- # 5. 【核心机制 2】:带 Buffer 的贪婪碰撞检测
- overlapping_end = 0
- for b_start, b_end in daily_blocks:
- # 障碍物向前后各膨胀 buffer 的大小
- blocked_start = b_start - buffer_time
- blocked_end = b_end + buffer_time
-
- # 区间交集判断逻辑
- if curr_ts < blocked_end and slot_end > blocked_start:
- if blocked_end > overlapping_end:
- overlapping_end = blocked_end
-
- if overlapping_end > 0:
- # 碰撞触发:指针直接跳跃到带缓冲区的“障碍物最晚结束时间点”
- curr_ts = overlapping_end
- # 进入下个 while 循环时,机制 1 会将其吸附回最近的合法网格上!
- else:
- # ========================================================
- # 完美过检:记录合法 Slot (在此处增加 Finish Time 的提取)
- # ========================================================
- dt_start = datetime.fromtimestamp(curr_ts, tz=timezone.utc)
-
- # 计算结束时间戳与时间对象
- finish_ts = curr_ts + default_length
- dt_finish = datetime.fromtimestamp(finish_ts, tz=timezone.utc)
- payload = {
- "resource_id": self.resource_id,
- "timestamp": curr_ts,
- "datetime": dt_start.strftime("%Y-%m-%d %H:%M:%S"),
- "finish_timestamp": finish_ts,
- "finish_datetime": dt_finish.strftime("%Y-%m-%d %H:%M:%S")
- }
- time_slot = TimeSlot(
- time=dt_start.strftime("%H:%M"),
- label=json.dumps(payload)
- )
-
- date_key = dt_start.date()
- if date_key not in valid_slots_map:
- valid_slots_map[date_key] = []
-
- valid_slots_map[date_key].append(time_slot)
-
- # 推进指针,寻找下一个槽位
- curr_ts += default_length
- # === 结果聚合与整理 ===
- if valid_slots_map:
- res.success = True
- res.availability_status = AvailabilityStatus.Available
- sorted_dates = sorted(valid_slots_map.keys())
- res.earliest_date = datetime.combine(sorted_dates[0], datetime.min.time())
-
- for d in sorted_dates:
- da = DateAvailability(
- date=datetime.combine(d, datetime.min.time()),
- times=valid_slots_map[d]
- )
- res.availability.append(da)
-
- self._log(f"Found slots on {len(sorted_dates)} days.")
- else:
- self._log("No slots found.")
-
- return res
- def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult:
- if user_inputs is None:
- user_inputs = {}
-
- res = VSBookResult()
- res.success = False
- # 重新登录一次,避免session expired
- self.create_session()
-
- exp_start = user_inputs.get('expected_start_date', '')
- exp_end = user_inputs.get('expected_end_date', '')
-
- date_map = {d.date.strftime("%Y-%m-%d"): d for d in slot_info.availability}
- all_dates = list(date_map.keys())
-
- valid_dates = self._filter_dates(all_dates, exp_start, exp_end)
- if not valid_dates:
- self._log(f"No available slots within the expected range ({exp_start} to {exp_end}).")
- return res
- candidate_slots =[]
- for date_str in valid_dates:
- if date_str in date_map:
- candidate_slots.extend(date_map[date_str].times)
-
- random.shuffle(candidate_slots)
- if not candidate_slots:
- self._log("No slots found after filtering.")
- return res
- self._log(f"Found {len(candidate_slots)} candidate slots. Starting booking attempts...")
-
- # [修复点 1]:严谨处理输入参数里的空字符串,防止 dict.get() 返回空字符串触发后端空白报错
- first_name = (user_inputs.get('first_name') or '').strip()
- last_name = (user_inputs.get('last_name') or '').strip()
- full_name = f"{first_name} {last_name}".strip() or "Traveler"
-
- phone_code = (user_inputs.get('phone_country_code') or '353').strip()
- phone_no = (user_inputs.get('phone') or '088888888').strip()
- mobile = f"{phone_code}{phone_no}"
-
- address = (user_inputs.get('address') or "Dublin, Ireland").strip()
- passport_no = (user_inputs.get('passport_no') or "P0000000").strip()
- nationality = (user_inputs.get('nationality') or 'Chinese').strip()
- visa_reason = (user_inputs.get('visa_reason') or 'tourism').strip()
-
- for target_slot in candidate_slots[0:2]:
- try:
- slot_data = json.loads(target_slot.label)
- start_time_str = slot_data.get('datetime')
- finish_time_str = slot_data.get('finish_datetime')
- bk_res_id = slot_data.get('resource_id', self.resource_id)
-
- start_dt = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S")
- if finish_time_str:
- finish_dt = datetime.strptime(finish_time_str, "%Y-%m-%d %H:%M:%S")
- else:
- finish_dt = start_dt + timedelta(hours=1)
-
- # 处理抵离日期确保不为空
- arrival_date = (user_inputs.get('arrival_date') or '').strip()
- if not arrival_date:
- arrival_date = (start_dt + timedelta(days=15)).strftime("%Y-%m-%d")
- request_url = f"https://www.supersaas.com/schedule/GreekEmbassyInDublin/Visas?view=day&day={start_dt.day}&month={start_dt.month}"
- fmt_start = f"{start_dt.day}/{start_dt.month}/{start_dt.year} {start_dt.strftime('%H:%M')}"
- fmt_finish = f"{finish_dt.day}/{finish_dt.month}/{finish_dt.year} {finish_dt.strftime('%H:%M')}"
- headers = {
- "Referer": request_url,
- "Origin": "https://www.supersaas.com",
- "Content-Type": "application/x-www-form-urlencoded",
- }
- # ==========================
- # Request step 1: 预检查/打开弹窗
- # ==========================
- payload_stp_1 = {
- "reservation[start_time]": fmt_start,
- "reservation[finish_time]": fmt_finish,
- "reservation[full_name]": full_name,
- "reservation[mobile]": mobile,
- "reservation[address]": address,
- "reservation[resource_id]": bk_res_id,
- "button": "",
- "reservation[xpos]": "100",
- "reservation[ypos]": "200"
- }
-
- self._log(f"[Attempt] Try slot {start_time_str} - Req step 1")
- resp_stp_1 = self._perform_request('POST', request_url, data=payload_stp_1, headers=headers)
- if "This spot has already been taken" in resp_stp_1.text:
- self._log(f"Slot {start_time_str} is TAKEN. Trying next...")
- continue
- if 'id="reservation_error"' in resp_stp_1.text or 'class="dbox-error"' in resp_stp_1.text:
- err_match = re.search(r'<li>(.*?)</li>', resp_stp_1.text)
- err_reason = err_match.group(1) if err_match else "Unknown error"
- self._log(f"Slot {start_time_str} unavailable ({err_reason}). Trying next...")
- continue
- time.sleep(random.uniform(10, 30))
- # ==========================
- # Request step 2: 提交表单
- # ==========================
- payload_stp_2 = {
- "form[5]": passport_no,
- "form[8]": nationality,
- "form[6]": visa_reason,
- "form[7]": arrival_date,
- "form[9]": "",
- "form_commit": "Submit",
-
- "reservation[start_time]": fmt_start,
- "reservation[finish_time]": fmt_finish,
- "reservation[full_name]": full_name,
- "reservation[mobile]": mobile,
- "reservation[address]": address,
- "reservation[resource_id]": bk_res_id,
- "reservation[xpos]": "100",
- "reservation[ypos]": "200"
- }
- self._log(f"[Attempt] Submitting form for {start_time_str} - Req step 2")
- resp_book_stp_2 = self._perform_request('POST', request_url, data=payload_stp_2, headers=headers)
- # [修复点 2]:正确判断表单是否提交成功(不能只靠200判断,因为即使有误也会返回200状态码呈现红框)
- if "Reservation successfully created" in resp_book_stp_2.text:
- res.success = True
- res.book_date = start_dt.strftime("%Y-%m-%d")
- res.book_time = start_dt.strftime("%H:%M")
- self._log(f"Booking SUCCESS for {res.book_date} at {res.book_time}")
- return res
- else:
- # 获取表单验证的具体失败原因用于日志记录
- if 'id="errorExplanation"' in resp_book_stp_2.text or 'class="errorExplanation"' in resp_book_stp_2.text:
- err_match = re.search(r'<li>(.*?)</li>', resp_book_stp_2.text)
- err_reason = err_match.group(1) if err_match else "Form validation error"
- self._log(f"Submission failed for {start_time_str} ({err_reason}). Trying next...")
- else:
- self._log(f"Submission failed for {start_time_str}: Unknown error. Trying next...")
- if self.config.debug:
- self._save_debug_html(resp_book_stp_2.text, prefix='book_step2_fail')
- continue
- except Exception as e:
- self._log(f"Exception trying slot {start_time_str}: {e}")
- continue
- self._log("All candidate slots failed.")
- return res
- def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]:
- if not start_str or not end_str:
- shuffled_dates = list(dates)
- random.shuffle(shuffled_dates)
- return shuffled_dates
-
- valid_dates = []
- try:
- s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
- e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
-
- for date_str in dates:
- curr_date = datetime.strptime(date_str, "%Y-%m-%d")
- if s_date <= curr_date <= e_date:
- valid_dates.append(date_str)
- except ValueError:
- self._log("Date format error in expected_start_date or expected_end_date. Ignoring filter.")
- shuffled_dates = list(dates)
- random.shuffle(shuffled_dates)
- return shuffled_dates
- random.shuffle(valid_dates)
- return valid_dates
- def _log(self, message):
- if self.logger:
- self.logger(f'[GrcPlugin] [{self.group_id}] {message}')
-
- def _save_debug_html(self, content: str, prefix: str = "debug"):
- save_dir = "data/debug_pages"
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{save_dir}/{prefix}_{timestamp}.html"
- with open(filename, "w", encoding="utf-8") as f:
- f.write(content)
- self._log(f"HTML saved to: {filename}")
-
- def _get_proxy_url(self):
- proxy_url = ""
- if self.config.proxy.ip:
- s = self.config.proxy
- if s.username:
- proxy_url = f"{s.proto}://{s.username}:{s.password}@{s.ip}:{s.port}"
- else:
- proxy_url = f"{s.proto}://{s.ip}:{s.port}"
- return proxy_url
-
- def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None):
- resp = self.session.request(method, url, headers=headers, data=data, json=json_data, params=params, timeout=30)
- if self.config.debug:
- self._log(f'[perform request] Response={resp.text[:200]}...\nMethod={method}, Url={url}, Data={data}, JsonData={json_data}, Params={params}')
- if resp.status_code == 200:
- return resp
- elif resp.status_code == 401:
- self.is_healthy = False
- raise SessionExpiredOrInvalidError()
- elif resp.status_code == 403:
- raise PermissionDeniedError()
- elif resp.status_code == 429:
- self.is_healthy = False
- raise RateLimiteddError()
- else:
- raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
|