启动一个最基础的 WEB 服务
创建文件 server.py
# Python 3 server example
from http.server import BaseHTTPRequestHandler, HTTPServerhostName = "localhost"
serverPort = 8080class MyServer(BaseHTTPRequestHandler):def do_GET(self):self.send_response(200)self.send_header("Content-type", "text/html")self.end_headers()self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>" % self.path, "utf-8"))if __name__ == "__main__":webServer = HTTPServer((hostName, serverPort), MyServer)print("Server started http://%s:%s" % (hostName, serverPort))try:webServer.serve_forever()except KeyboardInterrupt:passwebServer.server_close()print("Server stopped.")
启动命令
python3 server.py
区分访问路径
在do_GET
方法内, 使用 self.path
变量区分
def do_GET(self):if self.path == '/':self.send_response(200)self.send_header("Content-type", "text/html")self.end_headers()self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>" % self.path, "utf-8"))elif self.path == '/upload':self.send_response(200)self.send_header("Content-type", "text/html")self.end_headers()self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>" % self.path, "utf-8"))
处理 POST 请求
实现do_POST
方法
def do_POST(self):content_length = int(self.headers['Content-Length'])file_content = self.rfile.read(content_length)# Do what you wish with file_content#print file_content# Respond with 200 OKself.send_response(200)self.send_header("Content-type", "text/html")self.end_headers()self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>" % self.path, "utf-8"))
处理请求参数和 COOKIE 等
添加以下引用
from functools import cached_property
from http.cookies import SimpleCookie
from urllib.parse import parse_qsl, urlparse
在 MyServer 类下添加以下处理方法
@cached_propertydef url(self):return urlparse(self.path)@cached_propertydef query_data(self):return dict(parse_qsl(self.url.query))@cached_propertydef post_data(self):content_length = int(self.headers.get("Content-Length", 0))return self.rfile.read(content_length)@cached_propertydef form_data(self):return dict(parse_qsl(self.post_data.decode("utf-8")))@cached_propertydef cookies(self):return SimpleCookie(self.headers.get("Cookie"))
处理 Multipart 文件上传
需要引入
from urllib.parse import parse_qs, parse_qsl, urlparse
import cgi
对请求根据 content-type 分别处理
def parse_POST(self):print(self.headers)ctype, pdict = cgi.parse_header(self.headers['content-type'])if ctype == 'multipart/form-data':print("file request")pdict['boundary'] = bytes(pdict['boundary'], "utf-8")postvars = cgi.parse_multipart(self.rfile, pdict)elif ctype == 'application/x-www-form-urlencoded' or 'application/json': print("non-file request")length = int(self.headers['content-length'])postvars = parse_qs(self.rfile.read(length).decode('utf8'),keep_blank_values=1)elif ctype == 'application/octet-stream':print("octet stream header")postvars = {}else:print("nothing")postvars = {}a = self.rfileprint(dir(a))print(a.peek())return postvars
在 do_POST
中调用
def do_POST(self):postvars = self.parse_POST()print(postvars)
一个接收文件并调用 PaddleOCR 识别的WEB服务例子
server.py
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs, parse_qsl, urlparse
import cgi
import json
import paddleocr_helperhostName = "localhost"
serverPort = 8080class MyServer(BaseHTTPRequestHandler):def parse_POST(self):print(self.headers)ctype, pdict = cgi.parse_header(self.headers['content-type'])if ctype == 'multipart/form-data':print("file request")pdict['boundary'] = bytes(pdict['boundary'], "utf-8")postvars = cgi.parse_multipart(self.rfile, pdict)elif ctype == 'application/x-www-form-urlencoded' or 'application/json': print("non-file request")length = int(self.headers['content-length'])postvars = parse_qs(self.rfile.read(length).decode('utf8'),keep_blank_values=1)elif ctype == 'application/octet-stream':print("octet stream header")postvars = {}else:print("nothing")postvars = {}a = self.rfileprint(dir(a))print(a.peek())return postvarsdef do_GET(self):if self.path == '/':self.send_response(200)self.send_header("Content-type", "text/html")self.end_headers()self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>\r\n" % self.path, "utf-8"))def do_POST(self):postvars = self.parse_POST()#print(postvars)#print(type(postvars['file']))result = {}try:result = paddleocr_helper.parse_and_lookup(postvars['file'][0])except:print("Error occurred")result = {"code": 1,"message": "error","data": None}pass# Respond with 200 OKself.send_response(200)self.send_header("Content-type", "application/json")self.end_headers()#self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>\r\n" % self.path, "utf-8"))self.wfile.write(bytes(json.dumps(result), "utf-8"))if __name__ == "__main__":webServer = HTTPServer((hostName, serverPort), MyServer)print("Server started http://%s:%s" % (hostName, serverPort))try:webServer.serve_forever()except KeyboardInterrupt:passwebServer.server_close()print("Server stopped.")
paddleocr_helper.py
import json
import re
import math
from paddleocr import PaddleOCRdef parse_image(imagedata):ocr = PaddleOCR(show_log=False, use_angle_cls=True, lang="ch") # need to run only once to download and load model into memoryresult = ocr.ocr(imagedata, cls=True)outputs = []for idx in range(len(result)):res = result[idx]output = []for line in res:ocr_result = {'boxes' : line[0],'text' : line[1][0],'score' : line[1][1]}output.append(ocr_result)outputs.append(output)return outputsdef lookup_invoice_number(ocr_blocks):block_no = Noneblock_numbers = []for ocr_block in ocr_blocks:#print(ocr_block['text'])regex_result = re.compile('(票据号码|柔据号码|桑据号码|柔线号码|柔楼号码|系热号码):(\d+)').search(ocr_block['text'])if not regex_result is None:#print('----> ' + regex_result.group(2))return regex_result.group(2)regex_result = re.compile('N(?:.*?)(\d{8,})').search(ocr_block['text'])if not regex_result is None:#print('----> ' + regex_result.group(1))return regex_result.group(1)if re.match('No(\.|:|:)', ocr_block['text']):#print('- No. block: {}'.format(ocr_block['text']))block_no = ocr_blockregex_result = re.compile('(\d{8,})').search(ocr_block['text'])if not regex_result is None:#print('- Num block: {}'.format(regex_result.group(1)))ocr_block['text'] = regex_result.group(1)block_numbers.append(ocr_block)if not block_no is None and not len(block_numbers) == 0:#print('- block_no:{}'.format(block_no))distance_min = Nonecandidate = Nonefor block_number in block_numbers:# calculate distance between number and Nodistance = calcu_distance(block_no['boxes'], block_number['boxes'])#print('- dist:{}, block:{}'.format(distance, block_number))print('- dist:{}, block:{}'.format(distance, block_number['text']))if (distance_min is None) or (distance < distance_min):distance_min = distancecandidate = block_number['text']return candidatereturn Nonedef parse_and_lookup(imagedata):invoice_numbers = []ocr_result = parse_image(imagedata)if len(ocr_result) > 0:for ocr_blocks in ocr_result:invoice_number = lookup_invoice_number(ocr_blocks)if (not invoice_number is None):invoice_numbers.append(invoice_number)resp = {"code": 0,"message": "succ","data": {"invoice_numbers": invoice_numbers}}return respdef calcu_distance(boxes1, boxes2):distance_min = Nonebox1 = Nonefor box in boxes1:distance = point_distance(box, boxes2[0])if (distance_min is None) or (distance < distance_min):distance_min = distancebox1 = boxfor box in boxes2:distance = point_distance(box, box1)if (distance_min is None) or (distance < distance_min):distance_min = distancereturn distance_mindef point_distance(point1, point2):x = point1[0] - point2[0]y = point1[1] - point2[1]qrt = math.sqrt(x**2 + y**2)return qrt
参考
- https://realpython.com/python-http-server/