上一篇文章我们介绍了C++的API,这篇文章我们主要针对的是Python的API,起始C++和Python在整体流程上面基本一致,但是由于Python天然的简洁性和易用性,Python的API相对来讲还是比较简单的,我们一起来看一下吧。
文章目录
- 4. The Python API
- 4.1 The Build Phase
- 4.1.1 Creating a Network Definition in Python
- 4.1.2 Importing a Model Using the ONNX Parser
- 4.1.3 Building an Engine
- 4.2 Deserializing a Plan
- 4.3 Performing Inference
- 4.4 samples研究
4. The Python API
本章节还是基于ONNX模型来阐述的,参考 onnx_resnet50.py获取更多信息(老样子,我们后面单独讲代码)。
Python API都可以从tensorrt模块中获取到:
import tensorrt as trt
4.1 The Build Phase
创建一个builder之前,需要创建一个logger,这样你后面所有的信息都可以通过logger来进行输出并进行分析,你可以直接像下面这样进行定义:
logger = trt.Logger(trt.Logger.WARNING)
也可以自定义,主要设计继承ILogger
类进行实现:
class MyLogger(trt.ILogger):def __init__(self):trt.ILogger.__init__(self)def log(self, severity, msg):pass # Your custom logging implementation herelogger = MyLogger()
然后创建builder:
builder = trt.Builder(logger)
还是和C++一样的说辞,builder比较耗时,如何让builder更快,参考:Optimizing Builder Performance
4.1.1 Creating a Network Definition in Python
创建完builder后,首先要做的就是创建一个网络定义(network definition):
network = builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
在使用 ONNX parser的方式来导入模型的时候,必须指定EXPLICIT_BATCH
这个flag,更多细节请参考: Explicit Versus Implicit Batch
4.1.2 Importing a Model Using the ONNX Parser
使用ONNX来填充我们定义好的网络框架,首先声明一个parser:
parser = trt.OnnxParser(network, logger)
然后,读取模型文件并且处理errors:
success = parser.parse_from_file(model_path) # 模型文件的路径
for idx in range(parser.num_errors):print(parser.get_error(idx))if not success:pass # Error handling code here
4.1.3 Building an Engine
接下来是创建一个build configuration来配置TensorRT如何进行模型优化:
config = builder.create_builder_config()
这个接口有甚多你可以设置的属性。一个重要的属性就是最大空间( maximum workspace size)。Layer的实现通常需要一个临时空间,这个参数限制了网络中的任意layer可以使用的最大空间。如果你没有提供一个足够的空间,TensorRT就无法找到一个层的实现(就是放不下了)。默认情况下,workspace被设置为给定设备的所有全局内存大小(total global memory),当你需要的时候,你应该来进行限定,比如说你只有一个设备,但是有多个engine在build:
config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, 1 << 20) # 1 MiB,2^20
指定完configuration后,就可以构建和序列化模型了:
serialized_engine = builder.build_serialized_network(network, config)
然后把engine存到本地磁盘,后续再使用:
with open(“sample.engine”, “wb”) as f:f.write(serialized_engine)
注意:Serialized engines不能跨平台或跨TensorRT版本进行移植。Engines是特定于它们所构建的确切GPU模型的(除了平台和TensorRT版本,也就是建议我们在哪用就在哪构建,除非你能保证版本都一致)。
4.2 Deserializing a Plan
进行推理的时候,使用Runtime
接口来序列化模型,和builder一样,runtime也需要一个logger实例:
runtime = trt.Runtime(logger)
从内存中序列化engine:
engine = runtime.deserialize_cuda_engine(serialized_engine)
当然咯,你也可以从本地文件中进行读取:
with open(“sample.engine”, “rb”) as f:serialized_engine = f.read()
4.3 Performing Inference
这个时候,所有的模型信息都给了engine变量,但是我们必须要管理中间激活( intermediate activations)的附加状态(真拗口,啥是中间激活,先有个印象)。我们通过ExecutionContext接口来进行:
context = engine.create_execution_context()
一个engine可以有多个execution contexts,允许一组权重用于多个重叠的推理任务(除非使用了dynamic shapes,每个optimization profile只能有一个 execution context,除非指定了预览特性kPROFILE_SHARING_0806,后续有机会再补充)。
运行推理,你还必须要指定input和output的buffer:
context.set_tensor_address(name, ptr)
几个Python包允许你在GPU上分配内存,包括但不限于官方CUDA Python bindings,PyTorch, cuPy和Numba。
这样你就完成了input的设置,你可以调用execute_async_v3()
方法来使用 CUDA stream进行推理,根据网络的结构和特点,网络可以异步执行,也可以同步执行。例如,可能导致同步行为的情况包括依赖数据的形状(data dependent shapes)、DLA的使用、循环和同步的插件(plugin)。
首先,创建一个 CUDA stream,如果你已经有了一个 CUDA stream,你可以使用一个指向已经存在的stream的指针,比如对于Pytorch CUDA stream就是torch.cuda.Stream()
,你可以使用cuda_stream
属性来获取这个指针,对于 Polygraphy CUDA streams,使用ptr
属性,或者直接调用cudaStreamCreate()
来创建一个CUDA Python binding(后面我们结合代码来看一下)。
然后开始推理:
context.execute_async_v3(buffers, stream_ptr)
推荐你在kernels从GPU传输完数据后进行异步数据传输的同步操作(其实就是调用cudaMemcpyAsync()
函数),这样可以保证数据传输完整。
要确定推理(可能还有cudaMemcpyAsync())何时完成,请使用标准的CUDA同步机制,例如事件( events)或着等待这个流结束。例如对于PyTorch CUDA streams 或 Polygraphy CUDA streams你可以使用stream.synchronize()
,对于CUDA Python binding你可以使用cudaStreamSynchronize(stream)
。
4.4 samples研究
首先打开samples/python/introductory_parser_samples/onnx_resnet50.py文件
-
Build a TensorRT engine
一起看main()
中build_engine_onnx()
:def build_engine_onnx(model_file):builder = trt.Builder(TRT_LOGGER) # 声明buildernetwork = builder.create_network(common.EXPLICIT_BATCH) # 定义网络config = builder.create_builder_config() # 声明configparser = trt.OnnxParser(network, TRT_LOGGER) # 声明parserconfig.max_workspace_size = common.GiB(1) # 配置config# Load the Onnx model and parse it in order to populate the TensorRT network.with open(model_file, "rb") as model:if not parser.parse(model.read()): # 读本地文件并解析print("ERROR: Failed to parse the ONNX file.")for error in range(parser.num_errors):print(parser.get_error(error))return Nonereturn builder.build_engine(network, config) # network->parser->builder这样的顺序链接起来
-
Allocate buffers and create a CUDA stream
一起看common.allocate_buffers(engine)函数:# Allocates all buffers required for an engine, i.e. host/device inputs/outputs. # If engine uses dynamic shapes, specify a profile to find the maximum input & output size. def allocate_buffers(engine: trt.ICudaEngine, profile_idx: Optional[int] = None):inputs = []outputs = []bindings = []stream = cuda_call(cudart.cudaStreamCreate()) # 和C++不同,这里需要特殊处理这个stream,因为python没有指针的概念tensor_names = [engine.get_tensor_name(i) for i in range(engine.num_io_tensors)]for binding in tensor_names:# 根据名称获得每个tensor的max shape,这样就可以分配足够的内存了# get_tensor_profile_shape returns (min_shape, optimal_shape, max_shape)# Pick out the max shape to allocate enough memory for the binding.shape = engine.get_tensor_shape(binding) if profile_idx is None else engine.get_tensor_profile_shape(binding, profile_idx)[-1]shape_valid = np.all([s >= 0 for s in shape])if not shape_valid and profile_idx is None:raise ValueError(f"Binding {binding} has dynamic shape, " +\"but no profile was specified.")size = trt.volume(shape)if engine.has_implicit_batch_dimension:size *= engine.max_batch_sizedtype = np.dtype(trt.nptype(engine.get_tensor_dtype(binding)))# Allocate host and device buffers# 这个函数比较重要,是核心函数,我们后面单独拎出来看一下bindingMemory = HostDeviceMem(size, dtype) # 是一个类# Append the device buffer to device bindings.# 把cudaMalloc()获得的nbytes数据的空间全部放到bindings列表中去bindings.append(int(bindingMemory.device))# Append to the appropriate list.# 单独处理输入输出节点if engine.get_tensor_mode(binding) == trt.TensorIOMode.INPUT:inputs.append(bindingMemory)else:outputs.append(bindingMemory)return inputs, outputs, bindings, stream
上面的allocate_buffers()函数到底干了啥事呢?就是逐层遍历,获取size和dtype,然后cudamalloc()申请空间,把大小都放在bindings里面,然后对于输出输出单独拎出来返回。我们一起再来研究一下HostDeviceMem()类到底干啥了,我们先只看他的初始化函数。
class HostDeviceMem: # 意思就是说,host内存包装在了一个numpy数组中了 """Pair of host and device memory, where the host memory is wrapped in a numpy array""" def __init__(self, size: int, dtype: np.dtype):nbytes = size * dtype.itemsize# cudart.cudaMallocHost(nbytes)这个就是在Host上进行内存申请的语句host_mem = cuda_call(cudart.cudaMallocHost(nbytes)) # CPU内存pointer_type = ctypes.POINTER(np.ctypeslib.as_ctypes_type(dtype))# cast是判断host_mem是不是pointer_type的,如果是就转换成numpy arrayself._host = np.ctypeslib.as_array(ctypes.cast(host_mem, pointer_type), (size,))self._device = cuda_call(cudart.cudaMalloc(nbytes)) # GPU内存self._nbytes = nbytes
-
创建execution context,推理的时候都会用到哟
context = engine.create_execution_context()
-
加载并预处理输入数据
# Load a normalized test case into the host input page-locked buffer. # 锁页内存更快,类似零拷贝:https://www.jianshu.com/p/e92e72c0ba51 test_image = random.choice(test_images) test_case = load_normalized_test_case(test_image, inputs[0].host)
load_normalized_test_case()函数实现了预处理和数据拷贝:
def load_normalized_test_case(test_image, pagelocked_buffer): # Converts the input image to a CHW Numpy array def normalize_image(image):# Resize, anti alias (Image.LANCZOS下采样过滤插值法) and transpose the image to CHW.c, h, w = ModelData.INPUT_SHAPEimage_arr = (np.asarray(image.resize((w, h), Image.LANCZOS)).transpose([2, 0, 1]).astype(trt.nptype(ModelData.DTYPE)).ravel())# This particular ResNet50 model requires some preprocessing, specifically, mean normalization.# ResNet50 要求的数据预处理return (image_arr / 255.0 - 0.45) / 0.225# Normalize the image and copy to pagelocked memory. # 使用np.copyto拷贝内存 np.copyto(pagelocked_buffer, normalize_image(Image.open(test_image))) return test_image
-
运行
输出是一个有1000长度的1维向量,代表1000分类,再来看一下怎么运行的吧:def _do_inference_base(inputs, outputs, stream, execute_async):# Transfer input data to the GPU.kind = cudart.cudaMemcpyKind.cudaMemcpyHostToDevice# 支持多个输入,从host逐个拷贝到device中去[cuda_call(cudart.cudaMemcpyAsync(inp.device, inp.host, inp.nbytes, kind, stream)) for inp in inputs]# Run inference.# 其实是 context.execute_async_v2(bindings=bindings, stream_handle=stream)execute_async()# Transfer predictions back from the GPU.kind = cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost# 从device逐个拷贝output到host[cuda_call(cudart.cudaMemcpyAsync(out.host, out.device, out.nbytes, kind, stream)) for out in outputs]# Synchronize the stream# 就是我们上面将的同步,这样保证数据传输完整cuda_call(cudart.cudaStreamSynchronize(stream))# Return only the host outputs.return [out.host for out in outputs]
-
后处理
就是利用argmax取出最大索引的位置作为输出:pred = labels[np.argmax(trt_outputs[0])] # 这里只拿了第一个输出,其实应该有几个输入就有几个输出吧 common.free_buffers(inputs, outputs, stream) if "_".join(pred.split()) in os.path.splitext(os.path.basename(test_case))[0]:print("Correctly recognized " + test_case + " as " + pred) else:print("Incorrectly recognized " + test_case + " as " + pred)
关于Python API的接口就这么多啦,能明显感觉到Pyhton API比C++更简单易用,加上这么多Python的第三方库,不管是预处理还是后处理都会比较方便,所以掌握Python API的使用也是非常重要的,大家一起加油呀!