import re import asyncio import uvicorn import tempfile from pathlib import Path from contextlib import asynccontextmanager from fastapi import FastAPI, Body, Request, Query, HTTPException from fastapi.responses import JSONResponse from fastapi.concurrency import run_in_threadpool from utils.browser_util import open_browser, attach_browser from toolkit.ocr_engine import PyTorchEngine, DddOcrEngine # ================= 全局资源 ================= # 异步锁,用于互斥控制 BROWSER_LOCK = asyncio.Lock() # OCR 引擎字典 engines = {} def _sync_get_visatype_ids(tmp_file: str): """ 这是实际执行浏览器操作的同步函数。 它会在独立的线程中运行,不会阻塞服务器。 """ result = {"status": "failed", "message": ""} try: browser = attach_browser() html_file_path = Path(tmp_file).resolve() file_url = f'file://{html_file_path}' browser.get(file_url) jur_id = None loc_id = None type_id = None subtype_id = None cat_id = None # 匹配 ID app_category_labels = browser.eles(f'Appointment Category', timeout=1) for app_category_label in app_category_labels: if app_category_label.states.has_rect and app_category_label.tag == 'label': eid = app_category_label.after('tag:input').attr('id') cat_id = int(''.join(filter(str.isdigit, eid))) break jurisdiction_labels = browser.eles(f'Jurisdiction', timeout=1) if jurisdiction_labels: for jurisdiction_label in jurisdiction_labels: if jurisdiction_label.states.has_rect and jurisdiction_label.tag == 'label': eid = jurisdiction_label.after('tag:input').attr('id') jur_id = int(''.join(filter(str.isdigit, eid))) break location_labels = browser.eles(f'Location', timeout=1) for location_label in location_labels: if location_label.states.has_rect and location_label.tag == 'label': eid = location_label.after('tag:input', index=2).attr('id') loc_id = int(''.join(filter(str.isdigit, eid))) break visa_type_labels = browser.eles(f'Visa Type', timeout=1) for visa_type_label in visa_type_labels: if visa_type_label.states.has_rect and visa_type_label.tag == 'label': eid = visa_type_label.after('tag:input').attr('id') type_id = int(''.join(filter(str.isdigit, eid))) break visa_subtype_labels = browser.eles(f'Visa Sub Type', timeout=1) for visa_subtype_label in visa_subtype_labels: if visa_subtype_label.states.has_rect and visa_subtype_label.tag == 'label': eid = visa_subtype_label.after('tag:input').attr('id') subtype_id = int(''.join(filter(str.isdigit, eid))) break data = { "jur_id": jur_id, "loc_id": loc_id, "type_id": type_id, "subtype_id": subtype_id, "cat_id": cat_id, } result["status"] = "success" result['data'] = data except Exception as e: result["message"] = str(e) print(f"[DrissionPage] Error: {e}") return result def _sync_get_visable_image_ids(tmp_file: str): """ 这是实际执行浏览器操作的同步函数。 它会在独立的线程中运行,不会阻塞服务器。 """ result = {"status": "failed", "message": ""} try: browser = attach_browser() images_ids = [] html_file_path = Path(tmp_file).resolve() file_url = f'file://{html_file_path}' browser.get(file_url) captions_ele = browser.ele('xpath://*[@id="captcha-main-div"]/div/div[1]', timeout=5) if not captions_ele: raise Exception('Captions elements not found') caption_eles = captions_ele.children() caption_text = '' for caption in caption_eles: if not caption.states.is_covered: caption_text = caption.text number = re.findall(r'\d+', caption_text)[0] captcha_images_ele = browser.ele('xpath://*[@id="captcha-main-div"]/div/div[2]') captcha_image_eles = captcha_images_ele.children() for captcha_image in captcha_image_eles: img = captcha_image.ele('.captcha-img') if img.states.has_rect and img.states.is_covered == False: img_src = img.attr('src') if img_src and img_src.startswith('data:image'): images_ids.append(captcha_image.attr('id')) data = { "number": number, "image_ids": images_ids, } result["status"] = "success" result['data'] = data except Exception as e: result["message"] = str(e) print(f"[DrissionPage] Error: {e}") return result # ================= 2. 生命周期管理 ================= @asynccontextmanager async def lifespan(app: FastAPI): # --- 启动 OCR (伪代码,请保留你之前的逻辑) --- print("--- Loading OCR Models ---") engines['pytorch'] = PyTorchEngine('data/ctc.pth') engines['ddddocr'] = DddOcrEngine() # --- 启动 DrissionPage --- print("--- Starting DrissionPage ---") # 创建浏览器对象,连接浏览器 open_browser() yield # --- 关闭资源 --- engines.clear() app = FastAPI(lifespan=lifespan) # ================= 3. 浏览器接口 (带忙碌检测) ================= @app.post("/browser/visable_captchas") async def browser_get_data(html_content: str = Body(..., media_type="text/plain") ): # 1. 非阻塞检查:锁是否被占用 if BROWSER_LOCK.locked(): return JSONResponse( status_code=503, content={ "code": 503, "status": "busy", "msg": "Browser is busy. One task at a time." } ) # 2. 获取锁 async with BROWSER_LOCK: print(f"[Browser] Processing") # 3. 写入临时 HTML 文件 with tempfile.NamedTemporaryFile( mode="w+", suffix=".html", delete=True, encoding="utf-8" ) as f: f.write(html_content) f.flush() # 3. 核心:将同步的 DrissionPage 代码扔到线程池运行 # 这样主线程(处理 OCR 请求的线程)不会被卡死 result = await run_in_threadpool(_sync_get_visable_image_ids, f.name) return result # ================= 3. 浏览器接口 (带忙碌检测) ================= @app.post("/browser/visatype_visable") async def browser_get_data(html_content: str = Body(..., media_type="text/plain") ): # 1. 非阻塞检查:锁是否被占用 if BROWSER_LOCK.locked(): return JSONResponse( status_code=503, content={ "code": 503, "status": "busy", "msg": "Browser is busy. One task at a time." } ) # 2. 获取锁 async with BROWSER_LOCK: print(f"[Browser] Processing") # 3. 写入临时 HTML 文件 with tempfile.NamedTemporaryFile( mode="w+", suffix=".html", delete=True, encoding="utf-8" ) as f: f.write(html_content) f.flush() # 3. 核心:将同步的 DrissionPage 代码扔到线程池运行 # 这样主线程(处理 OCR 请求的线程)不会被卡死 result = await run_in_threadpool(_sync_get_visatype_ids, f.name) return result # ================= 路由 2: OCR 识别 (BLS) ================= @app.post("/predict/bls") async def predict_bls(request: Request, model: str = Query("ddddocr", enum=["ddddocr", "pytorch"])): """ 处理 BLS 验证码 """ try: image_bytes = await request.body() if not image_bytes: raise HTTPException(status_code=400, detail="Empty body") if model == 'ddddocr': res = engines['ddddocr'].inference_bytes(image_bytes) else: res = engines['pytorch'].inference_bytes(image_bytes) return {"code": 200, "msg": "success", "data": res, "engine": model} except Exception as e: return JSONResponse(status_code=500, content={"code": 500, "msg": str(e), "data": ""}) # ================= 路由 3: OCR 识别 (Visametric) ================= @app.post("/predict/visametric") async def predict_visametric(request: Request): """ 处理 Visametric 验证码 (特殊预处理) """ try: image_bytes = await request.body() res = engines['ddddocr'].inference_captcha(image_bytes) return {"code": 200, "msg": "success", "data": res} except Exception as e: return JSONResponse(status_code=500, content={"code": 500, "msg": str(e), "data": ""}) if __name__ == '__main__': # 运行服务 # host='0.0.0.0' 允许局域网访问 print("API Documentation: http://127.0.0.1:8085/docs") uvicorn.run(app, host='0.0.0.0', port=8085)