| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- import re
- import asyncio
- import uvicorn
- import tempfile
- from pathlib import Path
- from contextlib import asynccontextmanager
- from fastapi import FastAPI, Body, Request, Query, HTTPException
- from fastapi.responses import JSONResponse
- from fastapi.concurrency import run_in_threadpool
- from utils.browser_util import open_browser, attach_browser
- from toolkit.ocr_engine import PyTorchEngine, DddOcrEngine
- # ================= 全局资源 =================
- # 异步锁,用于互斥控制
- BROWSER_LOCK = asyncio.Lock()
- # OCR 引擎字典
- engines = {}
- def _sync_get_visatype_ids(tmp_file: str):
- """
- 这是实际执行浏览器操作的同步函数。
- 它会在独立的线程中运行,不会阻塞服务器。
- """
- result = {"status": "failed", "message": ""}
- try:
- browser = attach_browser()
- html_file_path = Path(tmp_file).resolve()
- file_url = f'file://{html_file_path}'
- browser.get(file_url)
-
- jur_id = None
- loc_id = None
- type_id = None
- subtype_id = None
- cat_id = None
-
- # 匹配 ID
- app_category_labels = browser.eles(f'Appointment Category', timeout=1)
- for app_category_label in app_category_labels:
- if app_category_label.states.has_rect and app_category_label.tag == 'label':
- eid = app_category_label.after('tag:input').attr('id')
- cat_id = int(''.join(filter(str.isdigit, eid)))
- break
- jurisdiction_labels = browser.eles(f'Jurisdiction', timeout=1)
- if jurisdiction_labels:
- for jurisdiction_label in jurisdiction_labels:
- if jurisdiction_label.states.has_rect and jurisdiction_label.tag == 'label':
- eid = jurisdiction_label.after('tag:input').attr('id')
- jur_id = int(''.join(filter(str.isdigit, eid)))
- break
- location_labels = browser.eles(f'Location', timeout=1)
- for location_label in location_labels:
- if location_label.states.has_rect and location_label.tag == 'label':
- eid = location_label.after('tag:input', index=2).attr('id')
- loc_id = int(''.join(filter(str.isdigit, eid)))
- break
- visa_type_labels = browser.eles(f'Visa Type', timeout=1)
- for visa_type_label in visa_type_labels:
- if visa_type_label.states.has_rect and visa_type_label.tag == 'label':
- eid = visa_type_label.after('tag:input').attr('id')
- type_id = int(''.join(filter(str.isdigit, eid)))
- break
- visa_subtype_labels = browser.eles(f'Visa Sub Type', timeout=1)
- for visa_subtype_label in visa_subtype_labels:
- if visa_subtype_label.states.has_rect and visa_subtype_label.tag == 'label':
- eid = visa_subtype_label.after('tag:input').attr('id')
- subtype_id = int(''.join(filter(str.isdigit, eid)))
- break
- data = {
- "jur_id": jur_id,
- "loc_id": loc_id,
- "type_id": type_id,
- "subtype_id": subtype_id,
- "cat_id": cat_id,
- }
- result["status"] = "success"
- result['data'] = data
- except Exception as e:
- result["message"] = str(e)
- print(f"[DrissionPage] Error: {e}")
-
- return result
- def _sync_get_visable_image_ids(tmp_file: str):
- """
- 这是实际执行浏览器操作的同步函数。
- 它会在独立的线程中运行,不会阻塞服务器。
- """
- result = {"status": "failed", "message": ""}
- try:
- browser = attach_browser()
- images_ids = []
- html_file_path = Path(tmp_file).resolve()
- file_url = f'file://{html_file_path}'
- browser.get(file_url)
- captions_ele = browser.ele('xpath://*[@id="captcha-main-div"]/div/div[1]', timeout=5)
- if not captions_ele:
- raise Exception('Captions elements not found')
- caption_eles = captions_ele.children()
- caption_text = ''
- for caption in caption_eles:
- if not caption.states.is_covered:
- caption_text = caption.text
- number = re.findall(r'\d+', caption_text)[0]
- captcha_images_ele = browser.ele('xpath://*[@id="captcha-main-div"]/div/div[2]')
- captcha_image_eles = captcha_images_ele.children()
- for captcha_image in captcha_image_eles:
- img = captcha_image.ele('.captcha-img')
- if img.states.has_rect and img.states.is_covered == False:
- img_src = img.attr('src')
- if img_src and img_src.startswith('data:image'):
- images_ids.append(captcha_image.attr('id'))
- data = {
- "number": number,
- "image_ids": images_ids,
- }
- result["status"] = "success"
- result['data'] = data
- except Exception as e:
- result["message"] = str(e)
- print(f"[DrissionPage] Error: {e}")
-
- return result
- # ================= 2. 生命周期管理 =================
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- # --- 启动 OCR (伪代码,请保留你之前的逻辑) ---
- print("--- Loading OCR Models ---")
- engines['pytorch'] = PyTorchEngine('data/ctc.pth')
- engines['ddddocr'] = DddOcrEngine()
-
- # --- 启动 DrissionPage ---
- print("--- Starting DrissionPage ---")
- # 创建浏览器对象,连接浏览器
- open_browser()
-
- yield
-
- # --- 关闭资源 ---
- engines.clear()
- app = FastAPI(lifespan=lifespan)
- # ================= 3. 浏览器接口 (带忙碌检测) =================
- @app.post("/browser/visable_captchas")
- async def browser_get_data(html_content: str = Body(..., media_type="text/plain")
- ):
- # 1. 非阻塞检查:锁是否被占用
- if BROWSER_LOCK.locked():
- return JSONResponse(
- status_code=503,
- content={
- "code": 503,
- "status": "busy",
- "msg": "Browser is busy. One task at a time."
- }
- )
- # 2. 获取锁
- async with BROWSER_LOCK:
- print(f"[Browser] Processing")
- # 3. 写入临时 HTML 文件
- with tempfile.NamedTemporaryFile(
- mode="w+",
- suffix=".html",
- delete=True,
- encoding="utf-8"
- ) as f:
- f.write(html_content)
- f.flush()
- # 3. 核心:将同步的 DrissionPage 代码扔到线程池运行
- # 这样主线程(处理 OCR 请求的线程)不会被卡死
- result = await run_in_threadpool(_sync_get_visable_image_ids, f.name)
-
- return result
-
- # ================= 3. 浏览器接口 (带忙碌检测) =================
- @app.post("/browser/visatype_visable")
- async def browser_get_data(html_content: str = Body(..., media_type="text/plain")
- ):
- # 1. 非阻塞检查:锁是否被占用
- if BROWSER_LOCK.locked():
- return JSONResponse(
- status_code=503,
- content={
- "code": 503,
- "status": "busy",
- "msg": "Browser is busy. One task at a time."
- }
- )
- # 2. 获取锁
- async with BROWSER_LOCK:
- print(f"[Browser] Processing")
- # 3. 写入临时 HTML 文件
- with tempfile.NamedTemporaryFile(
- mode="w+",
- suffix=".html",
- delete=True,
- encoding="utf-8"
- ) as f:
- f.write(html_content)
- f.flush()
- # 3. 核心:将同步的 DrissionPage 代码扔到线程池运行
- # 这样主线程(处理 OCR 请求的线程)不会被卡死
- result = await run_in_threadpool(_sync_get_visatype_ids, f.name)
-
- return result
- # ================= 路由 2: OCR 识别 (BLS) =================
- @app.post("/predict/bls")
- async def predict_bls(request: Request, model: str = Query("ddddocr", enum=["ddddocr", "pytorch"])):
- """ 处理 BLS 验证码 """
- try:
- image_bytes = await request.body()
- if not image_bytes:
- raise HTTPException(status_code=400, detail="Empty body")
- if model == 'ddddocr':
- res = engines['ddddocr'].inference_bytes(image_bytes)
- else:
- res = engines['pytorch'].inference_bytes(image_bytes)
-
- return {"code": 200, "msg": "success", "data": res, "engine": model}
- except Exception as e:
- return JSONResponse(status_code=500, content={"code": 500, "msg": str(e), "data": ""})
- # ================= 路由 3: OCR 识别 (Visametric) =================
- @app.post("/predict/visametric")
- async def predict_visametric(request: Request):
- """ 处理 Visametric 验证码 (特殊预处理) """
- try:
- image_bytes = await request.body()
- res = engines['ddddocr'].inference_captcha(image_bytes)
- return {"code": 200, "msg": "success", "data": res}
- except Exception as e:
- return JSONResponse(status_code=500, content={"code": 500, "msg": str(e), "data": ""})
- if __name__ == '__main__':
- # 运行服务
- # host='0.0.0.0' 允许局域网访问
- print("API Documentation: http://127.0.0.1:8085/docs")
- uvicorn.run(app, host='0.0.0.0', port=8085)
|