# predict_server.py import os import json import string import socket import traceback from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO from collections import OrderedDict # 深度学习依赖 import torch from torch import nn from torchvision import transforms from PIL import Image # ================= 定义模型结构 (保持不变) ================= class Model(nn.Module): def __init__(self, n_classes, input_shape=(3, 64, 128)): super(Model, self).__init__() self.input_shape = input_shape channels = [32, 64, 128, 256, 256] layers = [2, 2, 2, 2, 2] kernels = [3, 3, 3, 3, 3] pools = [2, 2, 2, 2, (2, 1)] modules = OrderedDict() def cba(name, in_channels, out_channels, kernel_size): modules[f'conv{name}'] = nn.Conv2d(in_channels, out_channels, kernel_size, padding=(1, 1) if kernel_size == 3 else 0) modules[f'bn{name}'] = nn.BatchNorm2d(out_channels) modules[f'relu{name}'] = nn.ReLU(inplace=True) last_channel = 3 for block, (n_channel, n_layer, n_kernel, k_pool) in enumerate(zip(channels, layers, kernels, pools)): for layer in range(1, n_layer + 1): cba(f'{block+1}{layer}', last_channel, n_channel, n_kernel) last_channel = n_channel modules[f'pool{block + 1}'] = nn.MaxPool2d(k_pool) modules[f'dropout'] = nn.Dropout(0.25, inplace=True) self.cnn = nn.Sequential(modules) self.lstm = nn.LSTM(input_size=self.infer_features(), hidden_size=128, num_layers=2, bidirectional=True) self.fc = nn.Linear(in_features=256, out_features=n_classes) def infer_features(self): x = torch.zeros((1,)+self.input_shape) x = self.cnn(x) x = x.reshape(x.shape[0], -1, x.shape[-1]) return x.shape[1] def forward(self, x): x = self.cnn(x) x = x.reshape(x.shape[0], -1, x.shape[-1]) x = x.permute(2, 0, 1) x, _ = self.lstm(x) x = self.fc(x) return x # ================= 推理类 ================= class DeployModel: def __init__(self, model_path): self.num_classes = 12 self.characters = '-' + string.digits + '$' self.width = 150 self.hight = 80 self.model = Model(self.num_classes, input_shape=(3, self.hight, self.width)) if os.path.exists(model_path): self.model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu'))) self.model.eval() print(f"Model loaded successfully from {model_path}") else: raise FileNotFoundError(f"Model file not found: {model_path}") self.transforms_func = transforms.Compose([ transforms.Resize((self.hight, self.width)), transforms.ToTensor() ]) def decode(self, sequence): a = ''.join([self.characters[x] for x in sequence]) s = [] last = None for x in a: if x != last: s.append(x) last = x s2 = ''.join([x for x in s if x != self.characters[0]]) return s2 def inference_bytes(self, image_bytes): try: image = Image.open(BytesIO(image_bytes)) if image.mode == 'RGBA': image = image.convert('RGB') if self.transforms_func is not None: image = self.transforms_func(image) with torch.no_grad(): output = self.model(image.unsqueeze(0).cpu()) output_argmax = output.detach().permute(1, 0, 2).argmax(dim=-1) predict_label = self.decode(output_argmax[0]) return predict_label except Exception as e: print(f"Inference error: {e}") return "" # ================= HTTP 处理 ================= # 全局模型实例 deploy_model = None class RequestHandler(BaseHTTPRequestHandler): def _send_response(self, status, content_type, content): self.send_response(status) self.send_header('Content-type', content_type) self.end_headers() self.wfile.write(content) def do_POST(self): if self.path == '/predict/vfcode': try: # 获取内容长度 content_length = int(self.headers.get('Content-Length', 0)) if content_length == 0: self._send_response(400, 'application/json', json.dumps({'code': 400, 'msg': 'Empty body'}).encode()) return # 直接读取 Raw Binary 数据 (简化通信,避免 multipart 解析问题) file_content = self.rfile.read(content_length) # 推理 result_string = deploy_model.inference_bytes(file_content) response = { 'data': result_string, 'msg': "success", 'code': 200 } self._send_response(200, 'application/json', json.dumps(response).encode()) print(f"Processed request. Result: {result_string}") except Exception as e: traceback.print_exc() response = {'data': '', 'msg': 'failed', 'code': 500} self._send_response(500, 'application/json', json.dumps(response).encode()) else: self._send_response(404, 'text/plain', b'Not Found') if __name__ == '__main__': # 配置区 MODEL_PATH = 'data/ocr.pth' PORT = 8085 # 启动 if not os.path.exists(MODEL_PATH): print(f"[ERROR] 请确保模型文件存在: {MODEL_PATH}") exit(1) deploy_model = DeployModel(MODEL_PATH) server_address = ('0.0.0.0', PORT) # 监听所有接口 httpd = HTTPServer(server_address, RequestHandler) print(f'OCR Server running on port {PORT}...') try: httpd.serve_forever() except KeyboardInterrupt: pass httpd.server_close()