| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- # predict_server.py
- import os
- import json
- import string
- import socket
- import traceback
- from http.server import BaseHTTPRequestHandler, HTTPServer
- from io import BytesIO
- from collections import OrderedDict
- # 深度学习依赖
- import torch
- from torch import nn
- from torchvision import transforms
- from PIL import Image
- # ================= 定义模型结构 (保持不变) =================
- class Model(nn.Module):
- def __init__(self, n_classes, input_shape=(3, 64, 128)):
- super(Model, self).__init__()
- self.input_shape = input_shape
- channels = [32, 64, 128, 256, 256]
- layers = [2, 2, 2, 2, 2]
- kernels = [3, 3, 3, 3, 3]
- pools = [2, 2, 2, 2, (2, 1)]
- modules = OrderedDict()
-
- def cba(name, in_channels, out_channels, kernel_size):
- modules[f'conv{name}'] = nn.Conv2d(in_channels, out_channels, kernel_size,
- padding=(1, 1) if kernel_size == 3 else 0)
- modules[f'bn{name}'] = nn.BatchNorm2d(out_channels)
- modules[f'relu{name}'] = nn.ReLU(inplace=True)
-
- last_channel = 3
- for block, (n_channel, n_layer, n_kernel, k_pool) in enumerate(zip(channels, layers, kernels, pools)):
- for layer in range(1, n_layer + 1):
- cba(f'{block+1}{layer}', last_channel, n_channel, n_kernel)
- last_channel = n_channel
- modules[f'pool{block + 1}'] = nn.MaxPool2d(k_pool)
- modules[f'dropout'] = nn.Dropout(0.25, inplace=True)
-
- self.cnn = nn.Sequential(modules)
- self.lstm = nn.LSTM(input_size=self.infer_features(), hidden_size=128, num_layers=2, bidirectional=True)
- self.fc = nn.Linear(in_features=256, out_features=n_classes)
-
- def infer_features(self):
- x = torch.zeros((1,)+self.input_shape)
- x = self.cnn(x)
- x = x.reshape(x.shape[0], -1, x.shape[-1])
- return x.shape[1]
- def forward(self, x):
- x = self.cnn(x)
- x = x.reshape(x.shape[0], -1, x.shape[-1])
- x = x.permute(2, 0, 1)
- x, _ = self.lstm(x)
- x = self.fc(x)
- return x
- # ================= 推理类 =================
- class DeployModel:
- def __init__(self, model_path):
- self.num_classes = 12
- self.characters = '-' + string.digits + '$'
- self.width = 150
- self.hight = 80
- self.model = Model(self.num_classes, input_shape=(3, self.hight, self.width))
-
- if os.path.exists(model_path):
- self.model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
- self.model.eval()
- print(f"Model loaded successfully from {model_path}")
- else:
- raise FileNotFoundError(f"Model file not found: {model_path}")
- self.transforms_func = transforms.Compose([
- transforms.Resize((self.hight, self.width)),
- transforms.ToTensor()
- ])
-
- def decode(self, sequence):
- a = ''.join([self.characters[x] for x in sequence])
- s = []
- last = None
- for x in a:
- if x != last:
- s.append(x)
- last = x
- s2 = ''.join([x for x in s if x != self.characters[0]])
- return s2
-
- def inference_bytes(self, image_bytes):
- try:
- image = Image.open(BytesIO(image_bytes))
- if image.mode == 'RGBA':
- image = image.convert('RGB')
- if self.transforms_func is not None:
- image = self.transforms_func(image)
- with torch.no_grad():
- output = self.model(image.unsqueeze(0).cpu())
-
- output_argmax = output.detach().permute(1, 0, 2).argmax(dim=-1)
- predict_label = self.decode(output_argmax[0])
- return predict_label
- except Exception as e:
- print(f"Inference error: {e}")
- return ""
- # ================= HTTP 处理 =================
- # 全局模型实例
- deploy_model = None
- class RequestHandler(BaseHTTPRequestHandler):
-
- def _send_response(self, status, content_type, content):
- self.send_response(status)
- self.send_header('Content-type', content_type)
- self.end_headers()
- self.wfile.write(content)
- def do_POST(self):
- if self.path == '/predict/vfcode':
- try:
- # 获取内容长度
- content_length = int(self.headers.get('Content-Length', 0))
- if content_length == 0:
- self._send_response(400, 'application/json', json.dumps({'code': 400, 'msg': 'Empty body'}).encode())
- return
- # 直接读取 Raw Binary 数据 (简化通信,避免 multipart 解析问题)
- file_content = self.rfile.read(content_length)
- # 推理
- result_string = deploy_model.inference_bytes(file_content)
-
- response = {
- 'data': result_string,
- 'msg': "success",
- 'code': 200
- }
- self._send_response(200, 'application/json', json.dumps(response).encode())
- print(f"Processed request. Result: {result_string}")
- except Exception as e:
- traceback.print_exc()
- response = {'data': '', 'msg': 'failed', 'code': 500}
- self._send_response(500, 'application/json', json.dumps(response).encode())
- else:
- self._send_response(404, 'text/plain', b'Not Found')
- if __name__ == '__main__':
- # 配置区
- MODEL_PATH = 'data/ocr.pth'
- PORT = 8085
-
- # 启动
- if not os.path.exists(MODEL_PATH):
- print(f"[ERROR] 请确保模型文件存在: {MODEL_PATH}")
- exit(1)
-
- deploy_model = DeployModel(MODEL_PATH)
-
- server_address = ('0.0.0.0', PORT) # 监听所有接口
- httpd = HTTPServer(server_address, RequestHandler)
- print(f'OCR Server running on port {PORT}...')
- try:
- httpd.serve_forever()
- except KeyboardInterrupt:
- pass
- httpd.server_close()
|