[ "AI Engineering" ]

[ "AI Engineering" ]

AI Engineering 05 - Distributed Training Practice of the ResNet-50 Model Based on MindSpore

June 29, 2022

Introduction

Previously, we talked about the importance of distributed parallelism for AI computing power, parallel policies, and principles of automatic and semi-automatic parallelism implemented by the MindSpore framework. In this article, we use automatic parallelism to complete distributed training of the ResNet-50 model based on MindSpore. The procedure is as follows:

1. Prepare a dataset: Download the CIFAR-10 dataset as the training dataset.

2. Configure an 8-device environment powered by the Ascend 910 AI Processor.

3. Invoke the collective communication library: Introduce the HCCL to initialize the communication between multiple devices.

4. Load the dataset using data parallelism.

5. Define the ResNet-50 network.

6. Define a loss function and an optimizer for the distributed parallelism scenario.

7. Build network training code: Define the distributed parallelism policy and training code.

8. Execute the training script.

Step 1: Prepare a Dataset

Download the CIFAR-10 dataset trained by ResNet-50. The dataset consists of 60,000 32 x 32 color images in 10 classes, with 6000 images per class. There are 50000 training images and 10000 test images. Download the dataset package and decompress it to a local path. The cifar-10-batches-bin folder is extracted.

Step 2: Configure the Distributed Environment

To perform distributed training on a bare metal server, we need to configure the networking information file of the current multi-device environment. Take the Ascend 910 AI Processor as an example. The following shows the rank_table_8pcs.json configuration file for an 8-device environment.

 {      

    "board_id": "0x0000",      

    "chip_info": "910",      

    "deploy_mode": "lab",      

    "group_count": "1",      

    "group_list": [      

        {      

            "device_num": "8",      

            "server_num": "1",      

            "group_name": "",      

            "instance_count": "8",      

            "instance_list": [...]      

        }      

    ],      

    "para_plane_nic_location": "device",      

    "para_plane_nic_name": ["eth0","eth1","eth2","eth3","eth4","eth5","eth6","eth7"],      

    "para_plane_nic_num": "8",      

    "status": "completed"      

}      

Set the following parameters as required:

· board_id: current operating environment. Set this parameter to 0x0000 for x86 and 0x0020 for ARM.

· server_num: number of hosts.

· server_id: IP address of the local host.

· device_num/para_plane_nic_num/instance_count: number of devices.

· rank_id: logical sequence number of a device, which starts from 0.

· device_id: physical sequence number of a device, that is, the actual sequence number of a device on the host.

· device_ip: IP address of the integrated NIC. You can run the cat /etc/hccn.conf command on the current host to obtain this value (specified by address_x).

· para_plane_nic_name: NIC name.

Step 3: Invoke the Collective Communication Library

The distributed parallel training on MindSpore requires the Huawei Collective Communication Library (HCCL) for communication, which can be obtained in the software package of the Ascend AI Processor. In addition, the collective communication API provided by the HCCL is encapsulated in mindspore.communication.management to simplify distributed information configuration. HCCL implements multi-server multi-device communication based on the Ascend AI Processor. For common restrictions on using the distributed service, see Distributed Training.

The sample code for invoking the collective communication library is as follows:

import os      

from mindspore import context      

from mindspore.communication.management import init      

if __name__ == "__main__":      

     context.set_context(mode=context.GRAPH_MODE, device_target="Ascend", device_id=int(os.environ["DEVICE_ID"]))      

     init()      

     ...      

In the preceding commands:

· mode=context.GRAPH_MODE: To use distributed training, set mode to Graph_Mode. (PYNATIVE_MODE does not support parallelism.)

· device_id: physical sequence number of the device, that is, the actual sequence number of the device on the host.

· init: enables HCCL communication and completes initialization for distributed training.

Step 4: Load the Dataset Using Data Parallelism

During distributed training, data is imported using data parallelism. The following code loads the CIFAR-10 dataset using data parallelism. In the code, data_path indicates the dataset path, that is, the path of the cifar-10-batches-bin folder.

import mindspore.common.dtype as mstype      

import mindspore.dataset as ds      

import mindspore.dataset.transforms.c_transforms as C      

import mindspore.dataset.transforms.vision.c_transforms as vision      

from mindspore.communication.management import get_rank, get_group_size      

def create_dataset(data_path, repeat_num=1, batch_size=32, rank_id=0, rank_size=1):      

    resize_height = 224      

    resize_width = 224      

    rescale = 1.0 / 255.0      

    shift = 0.0      

         

    # Get rank_id and rank_size.      

    rank_id = get_rank()      

    rank_size = get_group_size()      

    data_set = ds.Cifar10Dataset(data_path, num_shards=rank_size, shard_id=rank_id)      

         

    # Define map operations.      

    random_crop_op = vision.RandomCrop((32, 32), (4, 4, 4, 4))      

    random_horizontal_op = vision.RandomHorizontalFlip()      

    resize_op = vision.Resize((resize_height, resize_width))      

    rescale_op = vision.Rescale(rescale, shift)      

    normalize_op = vision.Normalize((0.4465, 0.4822, 0.4914), (0.2010, 0.1994, 0.2023))      

    changeswap_op = vision.HWC2CHW()      

    type_cast_op = C.TypeCast(mstype.int32)      

    c_trans = [random_crop_op, random_horizontal_op]      

    c_trans += [resize_op, rescale_op, normalize_op, changeswap_op]      

    # Apply map operations on images.      

    data_set = data_set.map(input_columns="label", operations=type_cast_op)      

    data_set = data_set.map(input_columns="image", operations=c_trans)      

    # apply shuffle operations      

    data_set = data_set.shuffle(buffer_size=10)      

    # Apply batch operations.      

    data_set = data_set.batch(batch_size=batch_size, drop_remainder=True)      

    # Apply repeat operations.      

    data_set = data_set.repeat(repeat_num)      

    return data_set   

Parameters num_shards (number of devices) and shard_id (logical sequence number of a device) need to be passed to the dataset API. You are advised to obtain them through the HCCL API.

· get_rank: obtains the ID of the current device in the cluster.

· get_group_size: obtains the number of devices in the cluster.

Step 5: Define the Network

In data parallelism and automatic parallelism, the way of defining the network is the same as that in a single-device system. For details about the defining code, see ResNet-50 Implementation.

Step 6: Define a Loss Function and an Optimizer

Automatic parallelism splits a model by operator and obtains the optimal parallel policy through algorithms. Different from single-device training, you are advised to use small operators to implement a loss function to achieve better parallel training performance.

For the loss function, we expand SoftmaxCrossEntropyWithLogits into multiple small operators for implementation based on a mathematical formula. The sample code is as follows:

from mindspore.ops import operations as P      

from mindspore import Tensor      

import mindspore.ops.functional as F      

import mindspore.common.dtype as mstype      

import mindspore.nn as nn      

class SoftmaxCrossEntropyExpand(nn.Cell):      

    def __init__(self, sparse=False):      

        super(SoftmaxCrossEntropyExpand, self).__init__()      

        self.exp = P.Exp()      

        self.sum = P.ReduceSum(keep_dims=True)      

        self.onehot = P.OneHot()      

        self.on_value = Tensor(1.0, mstype.float32)      

        self.off_value = Tensor(0.0, mstype.float32)      

        self.div = P.Div()      

        self.log = P.Log()      

        self.sum_cross_entropy = P.ReduceSum(keep_dims=False)      

        self.mul = P.Mul()      

        self.mul2 = P.Mul()      

        self.mean = P.ReduceMean(keep_dims=False)      

        self.sparse = sparse      

        self.max = P.ReduceMax(keep_dims=True)      

        self.sub = P.Sub()      

             

    def construct(self, logit, label):      

        logit_max = self.max(logit, -1)      

        exp = self.exp(self.sub(logit, logit_max))      

        exp_sum = self.sum(exp, -1)      

        softmax_result = self.div(exp, exp_sum)      

        if self.sparse:      

            label = self.onehot(label, F.shape(logit)[1], self.on_value, self.off_value)      

        softmax_result_log = self.log(softmax_result)      

        loss = self.sum_cross_entropy((self.mul(softmax_result_log, label)), -1)      

        loss = self.mul2(F.scalar_to_array(-1.0), loss)      

        loss = self.mean(loss, -1)      

        return loss

We use the Momentum optimizer to update parameters. For details, see the implementation in the sample code.

Step 7: Build Network Training Code

context.set_auto_parallel_context is an API for configuring parallel training parameters and must be called before initializing the model. If no parameters are specified, the framework automatically sets the empirical values of the parameters based on the parallel mode. For example, in data parallelism, parameter_broadcast is enabled by default. Parameters:

· parallel_mode: distributed parallelism. Possible values are ParallelMode.STAND_ALONE (default), ParallelMode.DATA_PARALLEL, and ParallelMode.AUTO_PARALLEL.

· parameter_broadcast: indicates whether to broadcast initialized parameters. In DATA_PARALLEL and HYBRID_PARALLEL modes, the default value is True.

· mirror_mean: During backward propagation, the framework collects gradients of parameters by using data parallelism across multiple hosts, obtains the global gradient value, and passes the global gradient value to the optimizer for update. The default value is False, indicating that the allreduce_sum operation is applied. The value True indicates that the allreduce_mean operation is applied.

· device_num and global_rank: Retain their default values, which are obtained by calling the HCCL API.

If multiple network cases exist in the script, call context.reset_auto_parallel_context() to restore all parameters to default values before executing the next case. In the following example, we set parallel_mode to AUTO_PARALLEL. To switch to data parallelism, change the value of parallel_mode to DATA_PARALLEL.

from mindspore import context      

from mindspore.nn.optim.momentum import Momentum      

from mindspore.train.callback import LossMonitor      

from mindspore.train.model import Model, ParallelMode      

from resnet import resnet50      

device_id = int(os.getenv('DEVICE_ID'))      

context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")      

context.set_context(device_id=device_id) # set device_id      

def test_train_cifar(epoch_size=10):      

    context.set_auto_parallel_context(parallel_mode=ParallelMode.AUTO_PARALLEL, mirror_mean=True)      

    loss_cb = LossMonitor()      

    dataset = create_dataset(data_path, epoch_size)      

    batch_size = 32      

    num_classes = 10      

    net = resnet50(batch_size, num_classes)      

    loss = SoftmaxCrossEntropyExpand(sparse=True)      

    opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), 0.01, 0.9)      

    model = Model(net, loss_fn=loss, optimizer=opt)      

    model.train(epoch_size, dataset, callbacks=[loss_cb], dataset_sink_mode=True)    

In the preceding commands:

· dataset_sink_mode=True: uses the dataset offloading mode. That is, the computing is performed on the hardware platform.

· LossMonitor: returns the loss value through the callback function to monitor the loss function.

Step 8: Execute the Training Script

After the script required for training is edited, run the corresponding command to call the script. Currently, the single-device single-process operating mode is used on MindSpore for distributed execution. That is, one process runs on each device, and the number of total processes is the same as that of devices in use. For device 0, the corresponding process is executed in the foreground. For other devices, the corresponding processes are executed in the background. You need to create a directory for each process to store log information and operator compilation information. The following uses a distributed training script with eight devices as an example to describe how to run a training script.

     #!/bin/bash      

export DATA_PATH=${DATA_PATH: -$1}      

       export RANK_TABLE_FILE=$(pwd) /rank_table_8pcs.json      

       export RANK_SIZE=8      

       for((i=1;i<${RANK_SIZE};i++))      

       do      

           rm -rf device$i      

           mkdir device$i      

           cp ./resnet50_distributed_training.py ./resnet.py ./device$i      

           cd ./device$i      

           export DEVICE_ID=$i      

           export RANK_ID=$i      

           echo "start training for device $i"      

           env > env$i.log      

           pytest -s -v ./resnet50_distributed_training.py > train.log$i 2>&1 &      

           cd ../      

       done      

       rm -rf device0      

       mkdir device0      

       cp ./resnet50_distributed_training.py ./resnet.py ./device0      

       cd ./device0      

       export DEVICE_ID=0      

       export RANK_ID=0      

       echo "start training for device 0"      

       env > env0.log      

       pytest -s -v ./resnet50_distributed_training.py > train.log0 2>&1      

       if [ $? -eq 0 ];then      

           echo "training success"      

       else      

           echo "training failed"      

           exit 2      

       fi      

       cd ../   

The script requires the variable DATA_PATH (path of the dataset). The following environment variables need to be set:

· RANK_TABLE_FILE: path of the network information file.

· DEVICE_ID: actual sequence number of the current device on the corresponding host.

· RANK_ID: logical sequence number of the current device.

Run the script to start distributed training. The running takes about 5 minutes, which is mainly used for operator compilation. The actual training takes less than 20 seconds. For single-device scenarios, the script compilation and training takes about 10 minutes in total. The following is an example segment in train.log in the device directory:

epoch: 1 step: 156, loss is 2.0084016

epoch: 2 step: 156, loss is 1.6407638

epoch: 3 step: 156, loss is 1.6164391

epoch: 4 step: 156, loss is 1.6838071

epoch: 5 step: 156, loss is 1.6320667

epoch: 6 step: 156, loss is 1.3098773

epoch: 7 step: 156, loss is 1.3515002

epoch: 8 step: 156, loss is 1.2943741

epoch: 9 step: 156, loss is 1.2316195

epoch: 10 step: 156, loss is 1.1533381

Summary

Distributed training involves more development complexity than single-device training in the following aspects:

1. Distributed configuration and communication for multiple devices

2. Operator selection for the loss function when defining a network

3. Complex training script

4. Difficult debugging and tuning

Regardless of these development and debugging costs, distributed training brings impressive performance benefits. For the ResNet-50 model, the performance of eight-device training (Ascend AI Processors) is several times higher than that of single-device training. Therefore, distributed training is an ideal choice ever for models with high training requirements.

References

[1] Sample Code of Distributed Training