Unet模型使用官方基于kaggle Carvana Image Masking Challenge数据集训练的模型。
模型输入为572*572*3,输出为572*572*2。分割目标分别为,0:背景,1:汽车。
Pytorch的pth模型转化onnx模型:
import torchfrom unet import UNetmodel = UNet(n_channels=3, n_classes=2, bilinear=False)
model = model.to(memory_format=torch.channels_last)state_dict = torch.load("unet_carvana_scale1.0_epoch2.pth", map_location="cpu")
#del state_dict['mask_values']
model.load_state_dict(state_dict)dummy_input = torch.randn(1, 3, 572, 572)torch.onnx.export(model, dummy_input, "unet.onnx", verbose=True)
模型输入输出节点分析:
使用工具Netron查看模型结构,确定模型输入节点名称为input.1,输出节点名称为/outc/conv/Conv
onnx模型转化atlas模型:
atc --model=./unet.onnx --framework=5 --output=unet --soc_version=Ascend310P3 --input_shape="input.1:1,3,572,572" --output_type="/outc/conv/Conv:0:FP32" --out_nodes="/outc/conv/Conv:0"
推理代码实现:
import base64
import json
import os
import timeimport numpy as np
import cv2import MxpiDataType_pb2 as mxpi_data
from StreamManagerApi import InProtobufVector
from StreamManagerApi import MxProtobufIn
from StreamManagerApi import StreamManagerApidef check_dir(dir):if not os.path.exists(dir):os.makedirs(dir, exist_ok=True)class SDKInferWrapper:def __init__(self): # 完成初始化self._stream_name = Noneself._stream_mgr_api = StreamManagerApi()if self._stream_mgr_api.InitManager() != 0:raise RuntimeError("Failed to init stream manager.")pipeline_name = './nested_unet.pipeline'self.load_pipeline(pipeline_name)self.width = 572self.height = 572def load_pipeline(self, pipeline_path):with open(pipeline_path, 'r') as f:pipeline = json.load(f)self._stream_name = list(pipeline.keys())[0].encode() # 'unet_pytorch'if self._stream_mgr_api.CreateMultipleStreams(json.dumps(pipeline).encode()) != 0:raise RuntimeError("Failed to create stream.")def do_infer(self, img_bgr):# preprocessimage = cv2.resize(img_bgr, (self.width, self.height))image = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)image = image.astype('float32') / 255.0image = image.transpose(2, 0, 1)tensor_pkg_list = mxpi_data.MxpiTensorPackageList()tensor_pkg = tensor_pkg_list.tensorPackageVec.add()tensor_vec = tensor_pkg.tensorVec.add()tensor_vec.deviceId = 0tensor_vec.memType = 0for dim in [1, *image.shape]:tensor_vec.tensorShape.append(dim) # tensorshape属性为[1,3,572,572]input_data = image.tobytes()tensor_vec.dataStr = input_datatensor_vec.tensorDataSize = len(input_data)protobuf_vec = InProtobufVector()protobuf = MxProtobufIn()protobuf.key = b'appsrc0'protobuf.type = b'MxTools.MxpiTensorPackageList'protobuf.protobuf = tensor_pkg_list.SerializeToString()protobuf_vec.push_back(protobuf)unique_id = self._stream_mgr_api.SendProtobuf(self._stream_name, 0, protobuf_vec)if unique_id < 0:raise RuntimeError("Failed to send data to stream.")infer_result = self._stream_mgr_api.GetResult(self._stream_name, unique_id)if infer_result.errorCode != 0:raise RuntimeError(f"GetResult error. errorCode={infer_result.errorCode}, "f"errorMsg={infer_result.data.decode()}")output_tensor = self._parse_output_data(infer_result)output_tensor = np.squeeze(output_tensor)output_tensor = softmax(output_tensor)mask = np.argmax(output_tensor, axis =0)score = np.max(output_tensor, axis = 0)mask = cv2.resize(mask, [img_bgr.shape[1], img_bgr.shape[0]], interpolation=cv2.INTER_NEAREST)score = cv2.resize(score, [img_bgr.shape[1], img_bgr.shape[0]], interpolation=cv2.INTER_NEAREST)return mask, scoredef _parse_output_data(self, output_data):infer_result_data = json.loads(output_data.data.decode())content = json.loads(infer_result_data['metaData'][0]['content'])tensor_vec = content['tensorPackageVec'][0]['tensorVec'][0]data_str = tensor_vec['dataStr']tensor_shape = tensor_vec['tensorShape']infer_array = np.frombuffer(base64.b64decode(data_str), dtype=np.float32)return infer_array.reshape(tensor_shape)def draw(self, mask):color_lists = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]drawed_img = np.stack([mask, mask, mask], axis = 2)for i in np.unique(mask):drawed_img[:,:,0][drawed_img[:,:,0]==i] = color_lists[i][0]drawed_img[:,:,1][drawed_img[:,:,1]==i] = color_lists[i][1]drawed_img[:,:,2][drawed_img[:,:,2]==i] = color_lists[i][2]return drawed_imgdef softmax(x):exps = np.exp(x - np.max(x))return exps/np.sum(exps)def sigmoid(x):y = x.copy()y[x >= 0] = 1.0 / (1 + np.exp(-x[x >= 0]))y[x < 0] = np.exp(x[x < 0]) / (1 + np.exp(x[x < 0]))return ydef check_dir(dir):if not os.path.exists(dir):os.makedirs(dir, exist_ok=True)def test():dataset_dir = './sample_data'output_folder = "./infer_result" os.makedirs(output_folder, exist_ok=True)sdk_infer = SDKInferWrapper()# read imgimage_name = "./sample_data/images/111.jpg"img_bgr = cv2.imread(image_name)# infert1 = time.time()mask, score = sdk_infer.do_infer(img_bgr)t2 = time.time()print(t2-t1, mask, score)drawed_img = sdk_infer.draw(mask)cv2.imwrite("infer_result/draw.png", drawed_img)if __name__ == "__main__":test()
运行代码:
set -e
. /usr/local/Ascend/ascend-toolkit/set_env.sh
# Simple log helper functions
info() { echo -e "\033[1;34m[INFO ][MxStream] $1\033[1;37m" ; }
warn() { echo >&2 -e "\033[1;31m[WARN ][MxStream] $1\033[1;37m" ; }#export MX_SDK_HOME=/home/work/mxVision
export LD_LIBRARY_PATH=${MX_SDK_HOME}/lib:${MX_SDK_HOME}/opensource/lib:${MX_SDK_HOME}/opensource/lib64:/usr/local/Ascend/ascend-toolkit/latest/acllib/lib64:${LD_LIBRARY_PATH}
export GST_PLUGIN_SCANNER=${MX_SDK_HOME}/opensource/libexec/gstreamer-1.0/gst-plugin-scanner
export GST_PLUGIN_PATH=${MX_SDK_HOME}/opensource/lib/gstreamer-1.0:${MX_SDK_HOME}/lib/plugins#to set PYTHONPATH, import the StreamManagerApi.py
export PYTHONPATH=$PYTHONPATH:${MX_SDK_HOME}/pythonpython3 unet.py
exit 0
运行效果:
个人思考:
华为atlas的参考案例细节不到位,步骤缺失较多,摸索困难,代码写法较差,信创化道路任重而道远。
参考资料:
GitHub - milesial/Pytorch-UNet: PyTorch implementation of the U-Net for image semantic segmentation with high quality images
https://gitee.com/ascend/samples/tree/master/python/level2_simple_inference/3_segmentation/unet++