一.flask写的接口
1.1 manage.py启动服务(发送图片base64版)
这里要注意的是用docker的话,记得端口映射
#coding:utf-8
import base64
import io
import logging
import picklefrom flask import Flask, jsonify, request
from PIL import Image
from sklearn import metricslogging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s",level=logging.INFO,datefmt="%Y-%m-%d %H:%M:%S",
)app = Flask(__name__)@app.route("/evaluate", methods=["POST"])
def evaluate():"""评价指标"""logging.info("Call evaluate")truth = request.json["truth"]predcition = request.json["prediction"]accuracy = metrics.accuracy_score(truth, predcition)f1 = metrics.f1_score(truth, predcition, average="macro")precision = metrics.precision_score(truth, predcition, average="macro")recall = metrics.recall_score(truth, predcition, average="macro")return jsonify({"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1})@app.route("/ocr", methods=["POST"])
def ocr():"""图像分析 OCR"""logging.info("OCR")base64str = request.json["img"]# print('base64str:',base64str)img = Image.open(io.BytesIO(base64.b64decode(base64str)))print('img:',img.size)# text = pytesseract.image_to_string(Image.open(io.BytesIO(img)), lang="chi_sim")return jsonify({"text": 'Success'})if __name__ == "__main__":app.run(host="0.0.0.0", port=6006)
1.2.test_route 测试路由文件(发送图片base64版)
#coding:utf-8
from io import BytesIO
from PIL import Image
import base64
import json
import re
import requestsdef image_to_base64(img_path):with open(img_path, "rb") as f: # 转为二进制格式data_base64 = base64.b64encode(f.read()).decode() # 使用base64进行加密return data_base64def base64_to_image(base64_str):base64_data = re.sub('^data:image/.+;base64,', '', base64_str)byte_data = base64.b64decode(base64_data)image_data = BytesIO(byte_data)img = Image.open(image_data)return imgdef json_send(dataPModel,url):headers = {"Content-type": "application/json", "Accept": "text/plain", "charset": "UTF-8"}response = requests.post(url=url, headers=headers, data=json.dumps(dataPModel))# print('response:',response)return json.loads(response.text)if __name__ == "__main__":url = 'http://192.168.102.193:1112/ocr'img = Image.open('haha.png').convert('RGB')print('img.size:', img.size)img_base64 = image_to_base64('haha.png')dataPModel = {}dataPModel['img'] = img_base64# dataPModel['uid'] = "0x031"print('dataPModel:', dataPModel)result = json_send(dataPModel, url)print(result['text'])
服务器返回结果:
本地返回结果:
2.1.manage.py启动服务(发送图片二进制版)
#coding:utf-8
"""
fzh created on 2020/06/28
"""
import logging
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s",level=logging.INFO,datefmt="%Y-%m-%d %H:%M:%S",
)
import os
import base64
import io
from PIL import Image
import os
from flask import Flask, jsonify, request
import cv2
import glob
import numpy as np
import json
import datetime
import time
import re
import randomapp = Flask(__name__)@app.route("/express_reco", methods=["POST"])
def express_reco():logging.info('====快递单接收图片成功====')st1_time = time.time()algirm_info = {}path = './algrim_img'os.makedirs(path, exist_ok=True)#二进制图片方式name_ = random.randint(0, 1000)data_binary = request.files['encode_data']img_path = os.path.join(path, str(name_)+'.jpg')data_binary.save(img_path)#data_binary.filenameimg = cv2.imread(img_path)if __name__ == "__main__":app.run(host="0.0.0.0", port=6006)#注意部署要改为6006
2.2.test_route 测试路由文件(发送图片二进制版)
#coding:utf-8
import json
import os
import requests
import base64def json_send(url,dataPModel):# header = {"Content-type": "application/json", "Accept": "text/plain", "charset": "UTF-8"}header = {'Content-Type': 'application/json'}# response = requests.post(url=url, headers=header, data=json.dumps(dataPModel))response = requests.post(url=url, files=dataPModel)# print('response:', response)return json.loads(response.text)def send_express_reco():url = 'http://192.168.102.191:3112/express_reco'dataPModel = {}import cv2img_path = './快递单原始数据'imgs_list_path = [os.path.join(img_path, i) for i in os.listdir(img_path)]for i, img_list_path in enumerate(imgs_list_path[:10]):print('==img_list_path:', img_list_path)dataPModel['encode_data'] = open(img_list_path, 'rb')result = json_send(url, dataPModel)print('result:', result)print('==imgs_list_path[0]:',imgs_list_path[0])for i in range(10):dataPModel['encode_data'] = open(imgs_list_path[0], 'rb')result = json_send(url, dataPModel)print('result:', result)
3.1 flask上传视频流
(1)代码结构:
其中index.html用来渲染页面,app.py用来起服务
(2)index.html代码:
<!doctype html>
<html lang="en">
<head><!-- Required meta tags --><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no"><!-- Bootstrap CSS --><link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.1.3/css/bootstrap.min.css"integrity="sha384-MCw98/SFnGE8fJT3GXwEOngsV7Zt27NXFoaoApmYm81iuXoPkFOJwJ8ERdknLPMO" crossorigin="anonymous"><title>Live Streaming Demonstration</title>
</head>
<body>
<div class="container"><div class="row"><div class="col-lg-8 offset-lg-2"><h3 class="mt-5">视频显示</h3><img src="{{ url_for('video_feed') }}" width="100%"></div></div>
</div>
</body>
</html>
(3)app.py代码:
#coding:utf-8
from flask import Flask, render_template, Response
import cv2app = Flask(__name__)
camera = cv2.VideoCapture(0) # use 0 for web camera
# Use Ip Camera/CCTV/RTSP Link
# cv2.VideoCapture('rtsp://username:password@camera_ip_address:554/user=username_password='password'_channel=channel_number_stream=0.sdp')
### Example RTSP Link
# cv2.VideoCapture('rtsp://mamun:123456@101.134.16.117:554/user=mamun_password=123456_channel=0_stream=0.sdp') ```
def gen_frames(): # generate frame by frame from camerawhile True:# Capture frame-by-framesuccess, frame = camera.read() # read the camera frame# print('===frame.shape', frame.shape)if not success:breakelse:ret, buffer = cv2.imencode('.jpg', frame)frame = buffer.tobytes()yield (b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') # concat frame one by one and show result@app.route('/video_feed')
def video_feed():#Video streaming route. Put this in the src attribute of an img tagreturn Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')@app.route('/video', methods=["get"])
def index():"""Video streaming home page."""return render_template('index.html')if __name__ == '__main__':# app.run(debug=True, port=6006)app.run(host="0.0.0.0", port=6006)
python app.py启动服务即可
3.2查看视频流
二.开异步:
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(1)executor.submit(second_send_houduan,json_info)
注意传递参数要放在外面
#coding:utf-8
from threading import Thread
import os
import base64
import io
import logging
import pickle
from whole_image_detect_first_plate import main_plate
from whole_image_detect_second_recognize import main_recoginze
from third_title_word_recognize import main_title_word_recoginze
from flask import Flask, jsonify, request
from PIL import Image
from config import model_title_detect, model_word_detect,crnn_model
# from sklearn import metrics
import requests
import json
import time
from time import sleep
from sql_tools import *
import asyncio
from multiprocessing import Process, Poolfrom concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(1)logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s",level=logging.INFO,datefmt="%Y-%m-%d %H:%M:%S",
)app = Flask(__name__)def json_send_head(dataPModel,url):# headers = {"Content-type": "application/json", "Accept": "text/plain", "charset": "UTF-8"}response = requests.post(url=url, data=dataPModel)# print('response:',response)return json.loads(response.text)#第二次算法返回的结果
def second_send_houduan(json_info):print('entrance second houduan!!!')# 发给后端url = 'http://tc.aa.zhangzb.site:9091/api/page/plateGetJson'dataPmodel = {}if len(json_info):info = main_recoginze(json_info, model_title_detect, model_word_detect, crnn_model)dataPmodel['json'] = json.dumps(info)text = json_send_head(url, dataPmodel)print('text:', text)else:dataPmodel = {}info = {'NO': 'NO JSON INFO!!'}dataPmodel['json'] = json.dumps(info)json_send_head(url, dataPmodel)#第二次对接,后端发给我版面框划分好后传的json信息
@app.route("/second_recognize", methods=["POST"])
def second_recognize():logging.info("second_recognize")str_info = request.json["json"]json_info = json.loads(str_info)# json_info = str_info #自己调试用print('json_info:', json_info)executor.submit(second_send_houduan,json_info)# print('len(json_info):', len(json_info))data = {}if len(json_info):data['msg'] = ''data['code'] = '200'else:data['msg'] = '接收的json有问题'data['code'] = '201'return jsonify(data)if __name__ == "__main__":app.run(host="0.0.0.0", port=6006)
三.gunicorn部署Flask服务
pip install gunicorn
gunicorn命令启动
gunicorn -w 4 -b ip:port xxx:app
启动一个Flask应用
例如:gunicorn -w 4 -b 0.0.0.0:6006 manage:app
-w 4
是指预定义的工作进程数为4,-b 127.0.0.1:4000
指绑定地址和端口manage
是flask的启动python文件,app则是flask应用程序实例
# run.py
from flask import Flask
app = Flask(__name__)
进行配置config.py
# gunicorn config
workers = 6 # must be 1, because of gpu
# threads = 4
bind = "0.0.0.0:6006"
worker_class = "gevent"
worker_connections = 1500
timeout = 60
loglevel = "debug"
accesslog = "-"
errorlog = "-"
daemon = False
pidfile = "master_pid"
启动的时候:gunicorn -c ./config.py manage:app
退出gunicorn,通过下面命令查看
pstree -ap|grep gunicorn
在kill即可。
四.多GPU卡部署
参考:https://blog.miguelgrinberg.com/post/video-streaming-with-flask