[ "AI Engineering" ]
AI Engineering 05 - Distributed Training Practice of the ResNet-50 Model Based on MindSpore
June 29, 2022
Introduction
Previously, we talked about the importance of distributed parallelism for AI computing power, parallel policies, and principles of automatic and semi-automatic parallelism implemented by the MindSpore framework. In this article, we use automatic parallelism to complete distributed training of the ResNet-50 model based on MindSpore. The procedure is as follows:
1. Prepare a dataset: Download the CIFAR-10 dataset as the training dataset.
2. Configure an 8-device environment powered by the Ascend 910 AI Processor.
3. Invoke the collective communication library: Introduce the HCCL to initialize the communication between multiple devices.
4. Load the dataset using data parallelism.
5. Define the ResNet-50 network.
6. Define a loss function and an optimizer for the distributed parallelism scenario.
7. Build network training code: Define the distributed parallelism policy and training code.
8. Execute the training script.
Step 1: Prepare a Dataset
Download the CIFAR-10 dataset trained by ResNet-50. The dataset consists of 60,000 32 x 32 color images in 10 classes, with 6000 images per class. There are 50000 training images and 10000 test images. Download the dataset package and decompress it to a local path. The cifar-10-batches-bin folder is extracted.
Step 2: Configure the Distributed Environment
To perform distributed training on a bare metal server, we need to configure the networking information file of the current multi-device environment. Take the Ascend 910 AI Processor as an example. The following shows the rank_table_8pcs.json configuration file for an 8-device environment.
{
"board_id": "0x0000",
"chip_info": "910",
"deploy_mode": "lab",
"group_count": "1",
"group_list": [
{
"device_num": "8",
"server_num": "1",
"group_name": "",
"instance_count": "8",
"instance_list": [...]
}
],
"para_plane_nic_location": "device",
"para_plane_nic_name": ["eth0","eth1","eth2","eth3","eth4","eth5","eth6","eth7"],
"para_plane_nic_num": "8",
"status": "completed"
}
Set the following parameters as required:
· board_id: current operating environment. Set this parameter to 0x0000 for x86 and 0x0020 for ARM.
· server_num: number of hosts.
· server_id: IP address of the local host.
· device_num/para_plane_nic_num/instance_count: number of devices.
· rank_id: logical sequence number of a device, which starts from 0.
· device_id: physical sequence number of a device, that is, the actual sequence number of a device on the host.
· device_ip: IP address of the integrated NIC. You can run the cat /etc/hccn.conf command on the current host to obtain this value (specified by address_x).
· para_plane_nic_name: NIC name.
Step 3: Invoke the Collective Communication Library
The distributed parallel training on MindSpore requires the Huawei Collective Communication Library (HCCL) for communication, which can be obtained in the software package of the Ascend AI Processor. In addition, the collective communication API provided by the HCCL is encapsulated in mindspore.communication.management to simplify distributed information configuration. HCCL implements multi-server multi-device communication based on the Ascend AI Processor. For common restrictions on using the distributed service, see Distributed Training.
The sample code for invoking the collective communication library is as follows:
import os
from mindspore import context
from mindspore.communication.management import init
if __name__ == "__main__":
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend", device_id=int(os.environ["DEVICE_ID"]))
init()
...
In the preceding commands:
· mode=context.GRAPH_MODE: To use distributed training, set mode to Graph_Mode. (PYNATIVE_MODE does not support parallelism.)
· device_id: physical sequence number of the device, that is, the actual sequence number of the device on the host.
· init: enables HCCL communication and completes initialization for distributed training.
Step 4: Load the Dataset Using Data Parallelism
During distributed training, data is imported using data parallelism. The following code loads the CIFAR-10 dataset using data parallelism. In the code, data_path indicates the dataset path, that is, the path of the cifar-10-batches-bin folder.
import mindspore.common.dtype as mstype
import mindspore.dataset as ds
import mindspore.dataset.transforms.c_transforms as C
import mindspore.dataset.transforms.vision.c_transforms as vision
from mindspore.communication.management import get_rank, get_group_size
def create_dataset(data_path, repeat_num=1, batch_size=32, rank_id=0, rank_size=1):
resize_height = 224
resize_width = 224
rescale = 1.0 / 255.0
shift = 0.0
# Get rank_id and rank_size.
rank_id = get_rank()
rank_size = get_group_size()
data_set = ds.Cifar10Dataset(data_path, num_shards=rank_size, shard_id=rank_id)
# Define map operations.
random_crop_op = vision.RandomCrop((32, 32), (4, 4, 4, 4))
random_horizontal_op = vision.RandomHorizontalFlip()
resize_op = vision.Resize((resize_height, resize_width))
rescale_op = vision.Rescale(rescale, shift)
normalize_op = vision.Normalize((0.4465, 0.4822, 0.4914), (0.2010, 0.1994, 0.2023))
changeswap_op = vision.HWC2CHW()
type_cast_op = C.TypeCast(mstype.int32)
c_trans = [random_crop_op, random_horizontal_op]
c_trans += [resize_op, rescale_op, normalize_op, changeswap_op]
# Apply map operations on images.
data_set = data_set.map(input_columns="label", operations=type_cast_op)
data_set = data_set.map(input_columns="image", operations=c_trans)
# apply shuffle operations
data_set = data_set.shuffle(buffer_size=10)
# Apply batch operations.
data_set = data_set.batch(batch_size=batch_size, drop_remainder=True)
# Apply repeat operations.
data_set = data_set.repeat(repeat_num)
return data_set
Parameters num_shards (number of devices) and shard_id (logical sequence number of a device) need to be passed to the dataset API. You are advised to obtain them through the HCCL API.
· get_rank: obtains the ID of the current device in the cluster.
· get_group_size: obtains the number of devices in the cluster.
Step 5: Define the Network
In data parallelism and automatic parallelism, the way of defining the network is the same as that in a single-device system. For details about the defining code, see ResNet-50 Implementation.
Step 6: Define a Loss Function and an Optimizer
Automatic parallelism splits a model by operator and obtains the optimal parallel policy through algorithms. Different from single-device training, you are advised to use small operators to implement a loss function to achieve better parallel training performance.
For the loss function, we expand SoftmaxCrossEntropyWithLogits into multiple small operators for implementation based on a mathematical formula. The sample code is as follows:
from mindspore.ops import operations as P
from mindspore import Tensor
import mindspore.ops.functional as F
import mindspore.common.dtype as mstype
import mindspore.nn as nn
class SoftmaxCrossEntropyExpand(nn.Cell):
def __init__(self, sparse=False):
super(SoftmaxCrossEntropyExpand, self).__init__()
self.exp = P.Exp()
self.sum = P.ReduceSum(keep_dims=True)
self.onehot = P.OneHot()
self.on_value = Tensor(1.0, mstype.float32)
self.off_value = Tensor(0.0, mstype.float32)
self.div = P.Div()
self.log = P.Log()
self.sum_cross_entropy = P.ReduceSum(keep_dims=False)
self.mul = P.Mul()
self.mul2 = P.Mul()
self.mean = P.ReduceMean(keep_dims=False)
self.sparse = sparse
self.max = P.ReduceMax(keep_dims=True)
self.sub = P.Sub()
def construct(self, logit, label):
logit_max = self.max(logit, -1)
exp = self.exp(self.sub(logit, logit_max))
exp_sum = self.sum(exp, -1)
softmax_result = self.div(exp, exp_sum)
if self.sparse:
label = self.onehot(label, F.shape(logit)[1], self.on_value, self.off_value)
softmax_result_log = self.log(softmax_result)
loss = self.sum_cross_entropy((self.mul(softmax_result_log, label)), -1)
loss = self.mul2(F.scalar_to_array(-1.0), loss)
loss = self.mean(loss, -1)
return loss
We use the Momentum optimizer to update parameters. For details, see the implementation in the sample code.
Step 7: Build Network Training Code
context.set_auto_parallel_context is an API for configuring parallel training parameters and must be called before initializing the model. If no parameters are specified, the framework automatically sets the empirical values of the parameters based on the parallel mode. For example, in data parallelism, parameter_broadcast is enabled by default. Parameters:
· parallel_mode: distributed parallelism. Possible values are ParallelMode.STAND_ALONE (default), ParallelMode.DATA_PARALLEL, and ParallelMode.AUTO_PARALLEL.
· parameter_broadcast: indicates whether to broadcast initialized parameters. In DATA_PARALLEL and HYBRID_PARALLEL modes, the default value is True.
· mirror_mean: During backward propagation, the framework collects gradients of parameters by using data parallelism across multiple hosts, obtains the global gradient value, and passes the global gradient value to the optimizer for update. The default value is False, indicating that the allreduce_sum operation is applied. The value True indicates that the allreduce_mean operation is applied.
· device_num and global_rank: Retain their default values, which are obtained by calling the HCCL API.
If multiple network cases exist in the script, call context.reset_auto_parallel_context() to restore all parameters to default values before executing the next case. In the following example, we set parallel_mode to AUTO_PARALLEL. To switch to data parallelism, change the value of parallel_mode to DATA_PARALLEL.
from mindspore import context
from mindspore.nn.optim.momentum import Momentum
from mindspore.train.callback import LossMonitor
from mindspore.train.model import Model, ParallelMode
from resnet import resnet50
device_id = int(os.getenv('DEVICE_ID'))
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
context.set_context(device_id=device_id) # set device_id
def test_train_cifar(epoch_size=10):
context.set_auto_parallel_context(parallel_mode=ParallelMode.AUTO_PARALLEL, mirror_mean=True)
loss_cb = LossMonitor()
dataset = create_dataset(data_path, epoch_size)
batch_size = 32
num_classes = 10
net = resnet50(batch_size, num_classes)
loss = SoftmaxCrossEntropyExpand(sparse=True)
opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), 0.01, 0.9)
model = Model(net, loss_fn=loss, optimizer=opt)
model.train(epoch_size, dataset, callbacks=[loss_cb], dataset_sink_mode=True)
In the preceding commands:
· dataset_sink_mode=True: uses the dataset offloading mode. That is, the computing is performed on the hardware platform.
· LossMonitor: returns the loss value through the callback function to monitor the loss function.
Step 8: Execute the Training Script
After the script required for training is edited, run the corresponding command to call the script. Currently, the single-device single-process operating mode is used on MindSpore for distributed execution. That is, one process runs on each device, and the number of total processes is the same as that of devices in use. For device 0, the corresponding process is executed in the foreground. For other devices, the corresponding processes are executed in the background. You need to create a directory for each process to store log information and operator compilation information. The following uses a distributed training script with eight devices as an example to describe how to run a training script.
#!/bin/bash
export DATA_PATH=${DATA_PATH: -$1}
export RANK_TABLE_FILE=$(pwd) /rank_table_8pcs.json
export RANK_SIZE=8
for((i=1;i<${RANK_SIZE};i++))
do
rm -rf device$i
mkdir device$i
cp ./resnet50_distributed_training.py ./resnet.py ./device$i
cd ./device$i
export DEVICE_ID=$i
export RANK_ID=$i
echo "start training for device $i"
env > env$i.log
pytest -s -v ./resnet50_distributed_training.py > train.log$i 2>&1 &
cd ../
done
rm -rf device0
mkdir device0
cp ./resnet50_distributed_training.py ./resnet.py ./device0
cd ./device0
export DEVICE_ID=0
export RANK_ID=0
echo "start training for device 0"
env > env0.log
pytest -s -v ./resnet50_distributed_training.py > train.log0 2>&1
if [ $? -eq 0 ];then
echo "training success"
else
echo "training failed"
exit 2
fi
cd ../
The script requires the variable DATA_PATH (path of the dataset). The following environment variables need to be set:
· RANK_TABLE_FILE: path of the network information file.
· DEVICE_ID: actual sequence number of the current device on the corresponding host.
· RANK_ID: logical sequence number of the current device.
Run the script to start distributed training. The running takes about 5 minutes, which is mainly used for operator compilation. The actual training takes less than 20 seconds. For single-device scenarios, the script compilation and training takes about 10 minutes in total. The following is an example segment in train.log in the device directory:
epoch: 1 step: 156, loss is 2.0084016
epoch: 2 step: 156, loss is 1.6407638
epoch: 3 step: 156, loss is 1.6164391
epoch: 4 step: 156, loss is 1.6838071
epoch: 5 step: 156, loss is 1.6320667
epoch: 6 step: 156, loss is 1.3098773
epoch: 7 step: 156, loss is 1.3515002
epoch: 8 step: 156, loss is 1.2943741
epoch: 9 step: 156, loss is 1.2316195
epoch: 10 step: 156, loss is 1.1533381
Summary
Distributed training involves more development complexity than single-device training in the following aspects:
1. Distributed configuration and communication for multiple devices
2. Operator selection for the loss function when defining a network
3. Complex training script
4. Difficult debugging and tuning
Regardless of these development and debugging costs, distributed training brings impressive performance benefits. For the ResNet-50 model, the performance of eight-device training (Ascend AI Processors) is several times higher than that of single-device training. Therefore, distributed training is an ideal choice ever for models with high training requirements.
References