{ "cells": [ { "cell_type": "markdown", "source": [ "# on-device执行\n", "\n", "[![下载Notebook](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/r1.8/resource/_static/logo_notebook.png)](https://obs.dualstack.cn-north-4.myhuaweicloud.com/mindspore-website/notebook/r1.8/zh_cn/design/mindspore_on_device.ipynb) [![下载样例代码](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/r1.8/resource/_static/logo_download_code.png)](https://obs.dualstack.cn-north-4.myhuaweicloud.com/mindspore-website/notebook/r1.8/zh_cn/design/mindspore_on_device.py) [![查看源文件](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/r1.8/resource/_static/logo_source.png)](https://gitee.com/mindspore/docs/blob/r1.8/docs/mindspore/source_zh_cn/design/on_device.ipynb)" ], "metadata": {} }, { "cell_type": "markdown", "source": [ "## 概述\n", "\n", "MindSpore支持的后端包括Ascend、GPU、CPU,所谓On Device中的Device通常指Ascend(昇腾)AI处理器。\n", "\n", "昇腾芯片上集成了AICORE、AICPU和CPU。其中,AICORE负责大型Tensor Vector运算,AICPU负责标量运算,CPU负责逻辑控制和任务分发。\n", "\n", "Host侧CPU负责将图或算子下发到昇腾芯片。昇腾芯片由于具备了运算、逻辑控制和任务分发的功能,所以不需要与Host侧的CPU进行频繁的交互,只需要将计算完的最终结果返回给Host侧,实现整图下沉到Device执行,避免Host-Device频繁交互,减小了开销。\n", "\n", "## 计算图下沉\n", "\n", "计算图整图下沉到Device上执行,减少Host-Device交互开销。可以结合循环下沉实现多个Step下沉,进一步减少Host和Device的交互次数。\n", "\n", "循环下沉是在On Device执行的基础上的优化,目的是进一步减少Host侧和Device侧之间的交互次数。通常情况下,每个step都返回一个结果,循环下沉是控制每隔多少个step返回一次结果。\n", "\n", "默认配置下是每一个epoch返回一次结果,这样每个epoch里,Host侧和Device侧只需要进行一次数据交互。\n", "\n", "也可以结合`train`接口的`dataset_sink_mode`和`sink_size`控制每个epoch的下沉数据量。\n", "\n", "## 数据下沉\n", "\n", "`Model`的`train`接口参数`dataset_sink_mode`可以控制数据是否下沉。`dataset_sink_mode`为True表示数据下沉,否则为非下沉。所谓下沉即数据通过通道直接传送到Device上。\n", "\n", "dataset_sink_mode参数可以配合`sink_size`控制每个`epoch`下沉的数据量大小。当`dataset_sink_mode`设置为True,即数据下沉模式时:\n", "\n", "如果`sink_size`为默认值-1,则每一个`epoch`训练整个数据集,理想状态下下沉数据的速度快于硬件计算的速度,保证处理数据的耗时隐藏于网络计算时间内;\n", "\n", "如果`sink_size`>0,此时原始数据集可以被无限次遍历,下沉数据流程仍与`sink_size`=-1相同,不同点是每个`epoch`仅训练`sink_size`大小的数据量,如果有`LossMonitor`,那么会训练`sink_size`大小的数据量就打印一次loss值,下一个`epoch`继续从上次遍历的结束位置继续遍历。\n", "\n", "下沉的总数据量由`epoch`和`sink_size`两个变量共同控制,即总数据量=`epoch`*`sink_size`。\n", "\n", "当使用`LossMonitor`,`TimeMonitor`或其它`Callback`接口时,如果`dataset_sink_mode`设置为False,Host侧和Device侧之间每个`step`交互一次,所以会每个`step`返回一个结果,如果`dataset_sink_mode`为True,因为数据在Device上通过通道传输, Host侧和Device侧之间每个`epoch`进行一次数据交互,所以每个`epoch`只返回一次结果。\n", "\n", "> 当前CPU不支持数据下沉。\n", "> 如果在使用数据下沉模式时,出现`fault kernel_name=GetNext`、`GetNext... task error`或者`outputs = self.get_next()`等类似的错误,那么有可能是数据处理过程中某些样本处理太耗时,导致网络计算侧长时间拿不到数据报错,此时可以将`dataset_sink_mode`设置为False再次验证,或者对数据集使用`create_dict_iterator()`接口单独循环数据集,并参考[数据处理性能优化](https://www.mindspore.cn/tutorials/experts/zh-CN/r1.8/dataset/optimize.html)调优数据处理,保证数据处理高性能。\n", "\n", "代码样例如下:" ], "metadata": {} }, { "cell_type": "code", "execution_count": 4, "source": [ "import os\n", "import requests\n", "import mindspore.dataset as ds\n", "import mindspore as ms\n", "import mindspore.dataset.transforms as transforms\n", "import mindspore.dataset.vision as vision\n", "import mindspore.nn as nn\n", "from mindspore.common.initializer import TruncatedNormal\n", "from mindspore.dataset.vision import Inter\n", "import mindspore.ops as ops\n", "\n", "requests.packages.urllib3.disable_warnings()\n", "\n", "def create_dataset(data_path, batch_size=32, repeat_size=1,\n", " num_parallel_workers=1):\n", " \"\"\"\n", " create dataset for train or test\n", " \"\"\"\n", " # define dataset\n", " mnist_ds = ds.MnistDataset(data_path)\n", "\n", " resize_height, resize_width = 32, 32\n", " rescale = 1.0 / 255.0\n", " shift = 0.0\n", " rescale_nml = 1 / 0.3081\n", " shift_nml = -1 * 0.1307 / 0.3081\n", "\n", " # define map operations\n", " resize_op = vision.Resize((resize_height, resize_width), interpolation=Inter.LINEAR) # Bilinear mode\n", " rescale_nml_op = vision.Rescale(rescale_nml, shift_nml)\n", " rescale_op = vision.Rescale(rescale, shift)\n", " hwc2chw_op = vision.HWC2CHW()\n", " type_cast_op = transforms.TypeCast(ms.int32)\n", "\n", " # apply map operations on images\n", " mnist_ds = mnist_ds.map(input_columns=\"label\", operations=type_cast_op, num_parallel_workers=num_parallel_workers)\n", " mnist_ds = mnist_ds.map(input_columns=\"image\", operations=resize_op, num_parallel_workers=num_parallel_workers)\n", " mnist_ds = mnist_ds.map(input_columns=\"image\", operations=rescale_op, num_parallel_workers=num_parallel_workers)\n", " mnist_ds = mnist_ds.map(input_columns=\"image\", operations=rescale_nml_op, num_parallel_workers=num_parallel_workers)\n", " mnist_ds = mnist_ds.map(input_columns=\"image\", operations=hwc2chw_op, num_parallel_workers=num_parallel_workers)\n", "\n", " # apply DatasetOps\n", " buffer_size = 10000\n", " mnist_ds = mnist_ds.shuffle(buffer_size=buffer_size) # 10000 as in LeNet train script\n", " mnist_ds = mnist_ds.batch(batch_size, drop_remainder=True)\n", " mnist_ds = mnist_ds.repeat(repeat_size)\n", "\n", " return mnist_ds\n", "\n", "\n", "def conv(in_channels, out_channels, kernel_size, stride=1, padding=0):\n", " \"\"\"weight initial for conv layer\"\"\"\n", " weight = weight_variable()\n", " return nn.Conv2d(in_channels, out_channels,\n", " kernel_size=kernel_size, stride=stride, padding=padding,\n", " weight_init=weight, has_bias=False, pad_mode=\"valid\")\n", "\n", "\n", "def fc_with_initialize(input_channels, out_channels):\n", " \"\"\"weight initial for fc layer\"\"\"\n", " weight = weight_variable()\n", " bias = weight_variable()\n", " return nn.Dense(input_channels, out_channels, weight, bias)\n", "\n", "\n", "def weight_variable():\n", " \"\"\"weight initial\"\"\"\n", " return TruncatedNormal(0.02)\n", "\n", "\n", "class LeNet5(nn.Cell):\n", " \"\"\"\n", " Lenet network\n", " Args:\n", " num_class (int): Num classes. Default: 10.\n", "\n", " Returns:\n", " Tensor, output tensor\n", "\n", " Examples:\n", " >>> LeNet(num_class=10)\n", " \"\"\"\n", "\n", " def __init__(self, num_class=10):\n", " super(LeNet5, self).__init__()\n", " self.num_class = num_class\n", " self.batch_size = 32\n", " self.conv1 = conv(1, 6, 5)\n", " self.conv2 = conv(6, 16, 5)\n", " self.fc1 = fc_with_initialize(16 * 5 * 5, 120)\n", " self.fc2 = fc_with_initialize(120, 84)\n", " self.fc3 = fc_with_initialize(84, self.num_class)\n", " self.relu = nn.ReLU()\n", " self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)\n", " self.reshape = ops.Reshape()\n", "\n", " def construct(self, x):\n", " x = self.conv1(x)\n", " x = self.relu(x)\n", " x = self.max_pool2d(x)\n", " x = self.conv2(x)\n", " x = self.relu(x)\n", " x = self.max_pool2d(x)\n", " x = self.reshape(x, (self.batch_size, -1))\n", " x = self.fc1(x)\n", " x = self.relu(x)\n", " x = self.fc2(x)\n", " x = self.relu(x)\n", " x = self.fc3(x)\n", " return x\n", "\n", "def download_dataset(dataset_url, path):\n", " filename = dataset_url.split(\"/\")[-1]\n", " save_path = os.path.join(path, filename)\n", " if os.path.exists(save_path):\n", " return\n", " if not os.path.exists(path):\n", " os.makedirs(path)\n", " res = requests.get(dataset_url, stream=True, verify=False)\n", " with open(save_path, \"wb\") as f:\n", " for chunk in res.iter_content(chunk_size=512):\n", " if chunk:\n", " f.write(chunk)\n", " print(\"The {} file is downloaded and saved in the path {} after processing\".format(os.path.basename(dataset_url), path))\n", "\n", "\n", "if __name__ == \"__main__\":\n", " ms.set_context(mode=ms.GRAPH_MODE, device_target=\"GPU\")\n", " ds_train_path = \"./datasets/MNIST_Data/train/\"\n", " download_dataset(\"https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/train-labels-idx1-ubyte\", ds_train_path)\n", " download_dataset(\"https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/train-images-idx3-ubyte\", ds_train_path)\n", " ds_train = create_dataset(ds_train_path, 32)\n", "\n", " network = LeNet5(10)\n", " net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction=\"mean\")\n", " net_opt = nn.Momentum(network.trainable_params(), 0.01, 0.9)\n", " model = ms.Model(network, net_loss, net_opt)\n", "\n", " print(\"============== Starting Training ==============\")\n", " model.train(epoch=10, train_dataset=ds_train, callbacks=[ms.LossMonitor()], dataset_sink_mode=True, sink_size=1000)" ], "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "============== Starting Training ==============\n", "epoch: 1 step: 1000, loss is 0.110185064\n", "epoch: 2 step: 1000, loss is 0.12088283\n", "epoch: 3 step: 1000, loss is 0.15903473\n", "epoch: 4 step: 1000, loss is 0.030054657\n", "epoch: 5 step: 1000, loss is 0.013846226\n", "epoch: 6 step: 1000, loss is 0.052161213\n", "epoch: 7 step: 1000, loss is 0.0050197737\n", "epoch: 8 step: 1000, loss is 0.17207858\n", "epoch: 9 step: 1000, loss is 0.010310417\n", "epoch: 10 step: 1000, loss is 0.000672762\n" ] } ], "metadata": {} }, { "cell_type": "markdown", "source": [ "`batch_size`为32的情况下,数据集的大小为1875,当`sink_size`设置为1000时,表示每个`epoch`下沉1000个batch的数据,下沉次数为`epoch`=10,下沉的总数据量为:`epoch`*`sink_size`=10000。\n", "\n", "`dataset_sink_mode`为True,所以每个`epoch`返回一次结果。\n", "训练过程中使用`DatasetHelper`进行数据集的迭代及数据信息的管理。如果为下沉模式,使用 `mindspore.connect_network_with_dataset` 函数连接当前的训练网络或评估网络 `network` 和 `DatasetHelper`,此函数使用 `mindspore.ops.GetNext` 包装输入网络,以实现在前向计算时,在设备(Device)侧从对应名称为 `queue_name` 的数据通道中获取数据,并将数据传递到输入网络。如果为非下沉模式,则在主机(Host)直接遍历数据集获取数据。" ], "metadata": {} }, { "cell_type": "markdown", "source": [ "> `dataset_sink_mode`为False时,`sink_size`参数设置无效。" ], "metadata": {} } ], "metadata": { "kernelspec": { "display_name": "MindSpore", "language": "python", "name": "mindspore" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }