mindspore_lite.ModelParallelRunner

class mindspore_lite.ModelParallelRunner[源代码]

ModelParallelRunner 类定义了MindSpore Lite的Runner,它支持模型并行。与 model 相比, model 不支持并行,但 ModelParallelRunner 支持并行。一个Runner包含多个worker,worker为实际执行并行推理的单元。典型场景为当多个客户端向服务器发送推理任务时,服务器执行并行推理,缩短推理时间,然后将理结果返回给客户端。

说明

首先使用 init 方法进行初始化后,再调用其他方法。

样例:

>>> # Use case: serving inference.
>>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on.
>>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1.
>>> import mindspore_lite as mslite
>>> model_parallel_runner = mslite.ModelParallelRunner()
>>> print(model_parallel_runner)
model_path: .
get_inputs()[源代码]

获取模型的所有输入Tensor。

返回:

list[Tensor],模型的输入Tensor列表。

样例:

>>> # Use case: serving inference.
>>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on.
>>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1.
>>> import mindspore_lite as mslite
>>> context = mslite.Context()
>>> context.append_device_info(mslite.CPUDeviceInfo())
>>> runner_config = mslite.RunnerConfig(context=context, workers_num=4)
>>> model_parallel_runner = mslite.ModelParallelRunner()
>>> model_parallel_runner.init(model_path="mobilenetv2.ms", runner_config=runner_config)
>>> inputs = model_parallel_runner.get_inputs()
get_outputs()[源代码]

获取模型的所有输出Tensor。

返回:

list[Tensor],模型的输出Tensor列表。

样例:

>>> # Use case: serving inference.
>>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on.
>>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1.
>>> import mindspore_lite as mslite
>>> context = mslite.Context()
>>> context.append_device_info(mslite.CPUDeviceInfo())
>>> runner_config = mslite.RunnerConfig(context=context, workers_num=4)
>>> model_parallel_runner = mslite.ModelParallelRunner()
>>> model_parallel_runner.init(model_path="mobilenetv2.ms", runner_config=runner_config)
>>> outputs = model_parallel_runner.get_outputs()
init(model_path, runner_config=None)[源代码]

从模型路径构建模型并行Runner,以便它可以在设备上运行。

参数:
  • model_path (str) - 定义模型路径。

  • runner_config (RunnerConfig,可选) - 定义用于在模型池初始化期间传递上下文和选项的配置。默认值:None。

异常:
  • TypeError - model_path 不是str类型。

  • TypeError - runner_config 既不是RunnerConfig类型也不是None。

  • RuntimeError - model_path 文件路径不存在。

  • RuntimeError - 初始化模型并行Runner失败。

样例:

>>> # Use case: serving inference.
>>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on.
>>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1.
>>> import mindspore_lite as mslite
>>> context = mslite.Context()
>>> context.append_device_info(mslite.CPUDeviceInfo())
>>> runner_config = mslite.RunnerConfig(context=context, workers_num=4)
>>> model_parallel_runner = mslite.ModelParallelRunner()
>>> model_parallel_runner.init(model_path="mobilenetv2.ms", runner_config=runner_config)
>>> print(model_parallel_runner)
model_path: mobilenetv2.ms.
predict(inputs, outputs)[源代码]

对模型并行Runner进行推理。

参数:
  • inputs (list[Tensor]) - 包含所有输入Tensor的顺序列表。

  • outputs (list[Tensor]) - 模型输出按顺序填充到容器中。

异常:
  • TypeError - inputs 不是list类型。

  • TypeError - inputs 是list类型,但元素不是Tensor类型。

  • TypeError - output 不是list类型。

  • TypeError - output 是list类型,但元素不是Tensor类型。

  • RuntimeError - 预测推理模型失败。

样例:

>>> # Use case: serving inference.
>>> # precondition 1: Building MindSpore Lite serving package by export MSLITE_ENABLE_SERVER_INFERENCE=on.
>>> # precondition 2: install wheel package of MindSpore Lite built by precondition 1.
>>> import time
>>> from threading import Thread
>>> import numpy as np
>>> import mindspore_lite as mslite
>>>
>>> # Precondition 1: Download MindSpore Lite serving package or building MindSpore Lite serving package by
>>> #                 export MSLITE_ENABLE_SERVER_INFERENCE=on.
>>> # Precondition 2: Install wheel package of MindSpore Lite built by precondition 1.
>>>
>>> # the number of threads of one worker.
>>> # WORKERS_NUM * THREAD_NUM should not exceed the number of cores of the machine.
>>> THREAD_NUM = 1
>>> # In parallel inference, the number of workers in one `ModelParallelRunner` in server.
>>> # If you prepare to compare the time difference between parallel inference and serial inference,
>>> # you can set WORKERS_NUM = 1 as serial inference.
>>> WORKERS_NUM = 3
>>> # Simulate 5 clients, and each client sends 2 inference tasks to the server at the same time.
>>> PARALLEL_NUM = 5
>>> TASK_NUM = 2
>>>
>>>
>>> def parallel_runner_predict(parallel_runner, parallel_id):
...     # One Runner with 3 workers, set model input, execute inference and get output.
...     task_index = 0
...     while True:
...         if task_index == TASK_NUM:
...             break
...         task_index += 1
...         # Set model input
...         inputs = parallel_runner.get_inputs()
...         in_data = np.fromfile("./model/input.bin", dtype=np.float32)
...         inputs[0].set_data_from_numpy(in_data)
...         once_start_time = time.time()
...         # Execute inference
...         outputs = []
...         parallel_runner.predict(inputs, outputs)
...         once_end_time = time.time()
...         print("parallel id: ", parallel_id, " | task index: ", task_index, " | run once time: ",
...               once_end_time - once_start_time, " s")
...         # Get output
...         for output in outputs:
...             tensor_name = output.get_tensor_name().rstrip()
...             data_size = output.get_data_size()
...             element_num = output.get_element_num()
...             print("tensor name is:%s tensor size is:%s tensor elements num is:%s" % (tensor_name,
...                                                                                      data_size,
...                                                                                      element_num))
...
...             data = output.get_data_to_numpy()
...             data = data.flatten()
...             print("output data is:", end=" ")
...             for j in range(5):
...                 print(data[j], end=" ")
...             print("")
...
>>> # Init RunnerConfig and context, and add CPU device info
>>> cpu_device_info = mslite.CPUDeviceInfo(enable_fp16=False)
>>> context = mslite.Context(thread_num=THREAD_NUM, inter_op_parallel_num=THREAD_NUM)
>>> context.append_device_info(cpu_device_info)
>>> parallel_runner_config = mslite.RunnerConfig(context=context, workers_num=WORKERS_NUM)
>>> # Build ModelParallelRunner from file
>>> model_parallel_runner = mslite.ModelParallelRunner()
>>> model_parallel_runner.init(model_path="./model/mobilenetv2.ms", runner_config=parallel_runner_config)
>>> # The server creates 5 threads to store the inference tasks of 5 clients.
>>> threads = []
>>> total_start_time = time.time()
>>> for i in range(PARALLEL_NUM):
...     threads.append(Thread(target=parallel_runner_predict, args=(model_parallel_runner, i,)))
...
>>> # Start threads to perform parallel inference.
>>> for th in threads:
...     th.start()
...
>>> for th in threads:
...     th.join()
...
>>> total_end_time = time.time()
>>> print("total run time: ", total_end_time - total_start_time, " s")