import os # 屏蔽 ONNX Runtime 的警告日志 os.environ["ORT_LOGGING_LEVEL"] = "3" import json import string import socket import traceback import io # 新增 from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO from collections import OrderedDict from urllib.parse import urlparse, parse_qs # 图像处理依赖 import cv2 import numpy as np from PIL import Image, ImageFilter # 新增 ImageFilter # 深度学习依赖 import torch from torch import nn from torchvision import transforms # ddddocr 依赖 try: import ddddocr HAS_DDDDOCR = True except ImportError: print("[WARNING] ddddocr not installed. Run 'pip install ddddocr'") HAS_DDDDOCR = False # ================= 核心优化:图像去噪 (BLS专用) ================= def advanced_denoise(image_bytes): """ 针对 BLS 验证码的去噪流程 """ try: nparr = np.frombuffer(image_bytes, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) gray_blur = cv2.medianBlur(gray, 3) binary = cv2.adaptiveThreshold( gray_blur, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2 ) contours, _ = cv2.findContours(255 - binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) clean_img = np.ones(binary.shape, dtype="uint8") * 255 for cnt in contours: area = cv2.contourArea(cnt) if 30 < area < 1000: cv2.drawContours(clean_img, [cnt], -1, 0, -1) return Image.fromarray(clean_img) except Exception as e: print(f"[Denoise] Error: {e}") return Image.open(BytesIO(image_bytes)) # ================= PyTorch 模型结构 ================= class Model(nn.Module): def __init__(self, n_classes, input_shape=(3, 64, 128)): super(Model, self).__init__() self.input_shape = input_shape channels = [32, 64, 128, 256, 256] layers = [2, 2, 2, 2, 2] kernels = [3, 3, 3, 3, 3] pools = [2, 2, 2, 2, (2, 1)] modules = OrderedDict() def cba(name, in_channels, out_channels, kernel_size): modules[f'conv{name}'] = nn.Conv2d(in_channels, out_channels, kernel_size, padding=(1, 1) if kernel_size == 3 else 0) modules[f'bn{name}'] = nn.BatchNorm2d(out_channels) modules[f'relu{name}'] = nn.ReLU(inplace=True) last_channel = 3 for block, (n_channel, n_layer, n_kernel, k_pool) in enumerate(zip(channels, layers, kernels, pools)): for layer in range(1, n_layer + 1): cba(f'{block+1}{layer}', last_channel, n_channel, n_kernel) last_channel = n_channel modules[f'pool{block + 1}'] = nn.MaxPool2d(k_pool) modules[f'dropout'] = nn.Dropout(0.25, inplace=True) self.cnn = nn.Sequential(modules) self.lstm = nn.LSTM(input_size=self.infer_features(), hidden_size=128, num_layers=2, bidirectional=True) self.fc = nn.Linear(in_features=256, out_features=n_classes) def infer_features(self): x = torch.zeros((1,)+self.input_shape) x = self.cnn(x) x = x.reshape(x.shape[0], -1, x.shape[-1]) return x.shape[1] def forward(self, x): x = self.cnn(x) x = x.reshape(x.shape[0], -1, x.shape[-1]) x = x.permute(2, 0, 1) x, _ = self.lstm(x) x = self.fc(x) return x # ================= 引擎1: PyTorch ================= class PyTorchEngine: def __init__(self, model_path): self.num_classes = 12 self.characters = '-' + string.digits + '$' self.width = 150 self.hight = 80 self.model = Model(self.num_classes, input_shape=(3, self.hight, self.width)) if os.path.exists(model_path): self.model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu'))) self.model.eval() print(f"[PyTorch] Model loaded successfully from {model_path}") self.ready = True else: print(f"[PyTorch] Warning: Model file not found at {model_path}") self.ready = False self.transforms_func = transforms.Compose([ transforms.Resize((self.hight, self.width)), transforms.ToTensor() ]) def decode(self, sequence): a = ''.join([self.characters[x] for x in sequence]) s = [] last = None for x in a: if x != last: s.append(x) last = x s2 = ''.join([x for x in s if x != self.characters[0]]) return s2 def inference_bytes(self, image_bytes): if not self.ready: return "Error: Model not loaded" try: # === 恢复:直接使用 PIL 打开图片,移除 advanced_denoise === image = Image.open(BytesIO(image_bytes)) image = image.convert('RGB') if self.transforms_func is not None: image = self.transforms_func(image) with torch.no_grad(): output = self.model(image.unsqueeze(0).cpu()) output_argmax = output.detach().permute(1, 0, 2).argmax(dim=-1) predict_label = self.decode(output_argmax[0]) return predict_label except Exception as e: print(f"[PyTorch] Inference error: {e}") return "" # ================= 引擎2: DDDDOCR ================= class DddOcrEngine: def __init__(self): if HAS_DDDDOCR: self.ocr = ddddocr.DdddOcr(show_ad=False, beta=True) print("[DDDDOCR] Initialized successfully") self.ready = True else: print("[DDDDOCR] Library missing") self.ready = False def inference_bytes(self, image_bytes): """ 原有的 VFCode 识别逻辑 """ if not self.ready: return "Error: ddddocr not installed" try: # 1. VF 专用预处理 img_pil = advanced_denoise(image_bytes) # 2. 转 bytes 传给 ddddocr img_byte_arr = BytesIO() img_pil.save(img_byte_arr, format='PNG') processed_bytes = img_byte_arr.getvalue() # 3. 识别 res = self.ocr.classification(processed_bytes) return res except Exception as e: print(f"[DDDDOCR] Inference error: {e}") return "" def inference_captcha(self, image_bytes): """ [新增] 适配你提供的预处理逻辑 路径: /predict/visametric """ if not self.ready: return "Error: ddddocr not installed" try: # 1. 打开图片 image = Image.open(io.BytesIO(image_bytes)) # 2. 自定义预处理: 灰度 -> 中值滤波 -> 二值化 gray_img = image.convert("L").filter(ImageFilter.MedianFilter(size=3)) binary_img = gray_img.point(lambda p: 255 if p > 128 else 0) # 3. 转 bytes 并识别 with io.BytesIO() as img_buffer: binary_img.save(img_buffer, format="PNG") processed_bytes = img_buffer.getvalue() return self.ocr.classification(processed_bytes) except Exception as e: print(f"[DDDDOCR-Captcha] Inference error: {e}") return "" # ================= HTTP 处理 ================= engines = {} class RequestHandler(BaseHTTPRequestHandler): def _send_response(self, status, content_type, content): self.send_response(status) self.send_header('Content-type', content_type) self.end_headers() self.wfile.write(content) def log_message(self, format, *args): return def do_POST(self): parsed_path = urlparse(self.path) path = parsed_path.path query_params = parse_qs(parsed_path.query) # 获取 Content-Length try: content_length = int(self.headers.get('Content-Length', 0)) if content_length == 0: self._send_response(400, 'application/json', json.dumps({'code': 400, 'msg': 'Empty body'}).encode()) return file_content = self.rfile.read(content_length) except Exception: self._send_response(400, 'application/json', json.dumps({'code': 400, 'msg': 'Read body failed'}).encode()) return result_string = "" try: # === 路由 1: 原有的 VFCode 识别 === if path == '/predict/bls': model_type = query_params.get('model', ['ddddocr'])[0] if model_type == 'ddddocr': if 'ddddocr' in engines: result_string = engines['ddddocr'].inference_bytes(file_content) else: result_string = "Error: ddddocr not available" else: if 'pytorch' in engines: result_string = engines['pytorch'].inference_bytes(file_content) else: result_string = "Error: pytorch model not available" print(f"[VFCode] [{model_type}] Result: {result_string}") # === 路由 2: 新增的通用 Captcha 识别 === elif path == '/predict/visametric': if 'ddddocr' in engines: # 使用新增的预处理逻辑 result_string = engines['ddddocr'].inference_captcha(file_content) else: result_string = "Error: ddddocr not available" print(f"[Captcha] Result: {result_string}") else: self._send_response(404, 'text/plain', b'Not Found') return # 返回成功响应 response = { 'data': result_string, 'msg': "success", 'code': 200 } self._send_response(200, 'application/json', json.dumps(response).encode()) except Exception as e: traceback.print_exc() response = {'data': '', 'msg': 'failed', 'code': 500} self._send_response(500, 'application/json', json.dumps(response).encode()) if __name__ == '__main__': MODEL_PATH = 'data/ctc.pth' PORT = 8085 # 初始化 PyTorch 引擎 pytorch_engine = PyTorchEngine(MODEL_PATH) if pytorch_engine.ready: engines['pytorch'] = pytorch_engine # 初始化 DDDDOCR 引擎 ddd_engine = DddOcrEngine() if ddd_engine.ready: engines['ddddocr'] = ddd_engine server_address = ('0.0.0.0', PORT) httpd = HTTPServer(server_address, RequestHandler) print(f'OCR Server running on port {PORT}...') print(f'Routes available:') print(f' POST /predict/bls?model=ddddocr|pytorch') print(f' POST /predict/visametric (Uses specific preprocessing)') try: httpd.serve_forever() except KeyboardInterrupt: pass httpd.server_close()