# Dynamic Cluster Startup Method [![View Source On Gitee](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/r2.1/resource/_static/logo_source_en.svg)](https://gitee.com/mindspore/docs/blob/r2.1/tutorials/experts/source_en/parallel/dynamic_cluster.md) ## Overview For reliability requirements during training, MindSpore provides **dynamic cluster** features that enable users to start Ascend/GPU/CPU distributed training tasks without relying on any third-party library (OpenMPI) and without any modification to the training script. We recommend users to use this startup method in preference. Users can click [Multi-Card Startup Method](https://www.mindspore.cn/tutorials/experts/en/r2.1/parallel/startup_method.html) to check the support of multi-card startup method on different platforms. OpenMPI synchronizes data on the Host side and clustering between processes in a distributed training scenario. The MindSpore **Dynamic Cluster** feature replaces the OpenMPI capability by **reusing the Parameter Server mode training architecture**, which can be found in the [Parameter Server Mode](https://mindspore.cn/tutorials/experts/en/r2.1/parallel/parameter_server_training.html) training tutorial. The **Dynami Cluster** feature starts multiple MindSpore training processes as `Workers`, and starts an additional `Scheduler` for cluster and disaster recovery. The user only needs to make a few changes to the startup script to perform distributed training. > Dynamic cluster startup scripts can be quickly migrated between multiple hardware platforms without additional modifications. ## Precautions - Dynamic cluster does not currently support `PyNative` mode. ## Environment Variables Several environment variables need to be exported before the training script can be started for dynamic cluster, as shown in the following table:
Environment Variables Functions Types Values Descriptions
MS_ROLE Specify this process role. String
  • MS_SCHED: Represents a Scheduler process. Only one Scheduler is started for a training task, responsible for cluster, disaster recovery, etc. No training code is executed.
  • MS_WORKER: Represents the Worker process, and generally sets the distributed training process to this role.
  • MS_PSERVER: Represents Parameter Server process. This role is only available in Parameter Server mode. For more details, refer to Parameter Server Mode.
The Worker and Parameter Server processes will register with the Scheduler process to complete the cluster.
MS_SCHED_HOST Specify IP address of Scheduler. String Legitimate IP address. The current version does not support IPv6 addresses.
MS_SCHED_PORT Specify the Scheduler binding port number. Integer Port number in the range of 1024 to 65535.
MS_NODE_ID Specify the ID of this process, unique within the cluster. String Represents the unique ID of this process, which is automatically generated by MindSpore by default. MS_NODE_ID needs to be set in the following cases, but in general it does not need to be set and is automatically generated by MindSpore:
  • Start disaster recovery scenario: The current process ID needs to be obtained for disaster recovery so as to re-register with Scheduler.
  • Start GLOG log redirection scenario: In order to ensure that each training process log is saved independently, you need to set the process ID as the log saving path suffix.
  • Specify process rank id scenario: Users can specify the rank id of this process by setting MS_NODE_ID to some integer.
MS_WORKER_NUM Specify the number of processes whose role is MS_WORKER. Integer The integer greater than 0. The number of Worker processes started by the user should be equal to the value of this environment variable. If it is less than this value, the cluster will fail, while if it is more than this value, the Scheduler process will complete the cluster according to the Worker registration order, and the extra Worker processes will fail to start.
MS_SERVER_NUM Specify the number of processes whose role is MS_PSERVER. Integer The integer greater than 0. Only set Parameter Server training mode.
MS_ENABLE_RECOVERY Turn on disaster recovery. Integer 1 means on, and 0 means off. The default is 0.
MS_RECOVERY_PATH Persistent path folder. String Legitimate user directory. The Worker and Scheduler processes perform the necessary persistence during execution, such as the node information used to recover the cluster and the intermediate state of the training service, and save it through files.
MS_HCCL_CM_INIT Whether to initialize the HCCL using the CM method. Integer 1 means on, and 0 means off. The default is 0. It is recommended that this environment variable be on only on Ascend hardware platforms with a high number of communication domains. Turning on this environment variable reduces the memory footprint of the HCCL collective communication library, and training tasks are executed in the same way as `rank table` starts.
> The above environment variables should be set before each process starts and the contents of `MS_SCHED_HOST`, `MS_SCHED_PORT` and `MS_WORKER_NUM` should be consistent, otherwise the network will fail due to the inconsistent configuration of each process. ## Executing Training Tasks Since the **Dynamic Cluster** startup script can be consistent across hardware platforms, the following is an example of how to write a startup script using 8-card distributed training on a GPU hardware platform only: > The running directory of sample: [distributed_training](https://gitee.com/mindspore/docs/tree/r2.1/docs/sample_code/distributed_training) ### 1. Preparing Python Training Scripts ```python import mindspore as ms from mindspore.train import CheckpointConfig, ModelCheckpoint from mindspore.communication import init if __name__ == "__main__": ms.set_context(mode=ms.GRAPH_MODE, device_target="GPU") init() ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.DATA_PARALLEL, gradients_mean=True) ... ``` where - `mode=GRAPH_MODE`: To use distributed training, you need to specify the running mode as graph mode (the current version of **Dynamic Cluster** feature does not support PyNative mode). - `init()`: Initializing the cluster. Initialize the collective communication library (NCCL in this case) according to the backend specified in the `set_context` interface, and complete the distributed training initialization operation. - `ms.ParallelMode.DATA_PARALLEL`: Set the training mode to data parallel mode. Dynamic cluster also supports **secure encrypted channel** features and supports `TLS/SSL` protocols to meet users' security needs. By default, the secure encrypted channel is off. If you need to turn it on, you can call init() only after the secure encrypted channel is configured correctly through `set_ps_context`, otherwise the initialization of the cluster will fail. If you want to use the secure encrypted channel, please configure: `set_ps_context(config_file_path="/path/to/config_file.json", enable_ssl=True, client_password="123456", server_password="123456")` > For the detailed parameter configuration descriptions, refer to [mindspore.set_ps_context](https://www.mindspore.cn/docs/en/r2.1/api_python/mindspore/mindspore.set_ps_context.html#mindspore.set_ps_context) and and the [Security Authentication](#security-authentication) section of this document. ### 2. Preparing the Startup Script #### Single-Machine Multi-Card The content of the single-machine multi-card startup script `run_gpu_cluster.sh` is as follows. Before starting the Worker and Scheduler, you need to add the relevant environment variable settings: ```bash #!/bin/bash echo "==========================================" echo "Please run the script as: " echo "bash run_gpu_cluster.sh DATA_PATH" echo "For example: bash run_gpu_cluster.sh /path/dataset" echo "It is better to use the absolute path." echo "===========================================" DATA_PATH=$1 export DATA_PATH=${DATA_PATH} rm -rf device mkdir device cp ./resnet50_distributed_training_gpu.py ./resnet.py ./device cd ./device echo "start training" # Start 8 Worker training processes in a loop for((i=0;i<8;i++)); do export MS_WORKER_NUM=8 # Set the number of Worker processes in the cluster to 8 export MS_SCHED_HOST=127.0.0.1 # Set the Scheduler IP address to the local loop address export MS_SCHED_PORT=8118 # Set Scheduler port export MS_ROLE=MS_WORKER # Set the started process to the MS_WORKER role export MS_NODE_ID=$i # Set process id, optional pytest -s -v ./resnet50_distributed_training_gpu.py > worker_$i.log 2>&1 & # Start training script done # Start 1 Scheduler process export MS_WORKER_NUM=8 # Set the number of Worker processes in the cluster to 8 export MS_SCHED_HOST=127.0.0.1 # Set the Scheduler IP address to the local loop address export MS_SCHED_PORT=8118 # Set Scheduler port export MS_ROLE=MS_SCHED # Set the started process to the MS_SCHED role pytest -s -v ./resnet50_distributed_training_gpu.py > scheduler.log 2>&1 & # Start training script ``` > The training scripts for the Scheduler and Worker processes are identical in content and startup method, because the internal processes of the two roles are handled differently in MindSpore. Users simply pull up the process in the normal training manner, without modifying the Python code by role. This is one of the reasons why dynamic cluster startup scripts can be consistent across multiple hardware platforms. A single-machine 8-card distributed training can be executed by executing the following command: ```bash ./run_gpu_cluster.sh /path/to/dataset/ ``` #### Multi-Machine Multi-Card The startup script needs to be split in the multi-machine training scenario. The following is an example of performing 2-machine 8-card training, with each machine executing the startup 4 Worker: The script `run_gpu_cluster_1.sh` starts 1 `Scheduler` and `Worker1` to `Worker4` on node 1: ```bash #!/bin/bash echo "==========================================" echo "Please run the script as: " echo "bash run_gpu_cluster.sh DATA_PATH" echo "For example: bash run_gpu_cluster.sh /path/dataset" echo "It is better to use the absolute path." echo "===========================================" DATA_PATH=$1 export DATA_PATH=${DATA_PATH} rm -rf device mkdir device cp ./resnet50_distributed_training_gpu.py ./resnet.py ./device cd ./device echo "start training" # Start Worker1 to Worker4, 4 Worker training processes in a loop for((i=0;i<4;i++)); do export MS_WORKER_NUM=8 # Set the total number of Worker processes in the cluster to 8 (including other node processes) export MS_SCHED_HOST= # Set the Scheduler IP address to the Node 1 IP address export MS_SCHED_PORT=8118 # Set the Scheduler port export MS_ROLE=MS_WORKER # Set the startup process to the MS_WORKER role export MS_NODE_ID=$i # Set process id, optional pytest -s -v ./resnet50_distributed_training_gpu.py > worker_$i.log 2>&1 & # Start training script done # Start 1 Scheduler process on node 1 export MS_WORKER_NUM=8 # Set the total number of Worker processes in the cluster to 8 (including other node processes) export MS_SCHED_HOST= # Set the Scheduler IP address to the Node 1 IP address export MS_SCHED_PORT=8118 # Set the Scheduler port export MS_ROLE=MS_SCHED # Set the startup process to the MS_SCHED role pytest -s -v ./resnet50_distributed_training_gpu.py > scheduler.log 2>&1 & # Start training script ``` The script `run_gpu_cluster_2.sh` starts `Worker5` to `Worker8` on node 2 (without executing Scheduler): ```bash #!/bin/bash echo "==========================================" echo "Please run the script as: " echo "bash run_gpu_cluster.sh DATA_PATH" echo "For example: bash run_gpu_cluster.sh /path/dataset" echo "It is better to use the absolute path." echo "===========================================" DATA_PATH=$1 export DATA_PATH=${DATA_PATH} rm -rf device mkdir device cp ./resnet50_distributed_training_gpu.py ./resnet.py ./device cd ./device echo "start training" # Start Worker5 to Worker8, 4 Worker training processes in a loop for((i=4;i<8;i++)); do export MS_WORKER_NUM=8 # Set the total number of Worker processes in the cluster to 8 (including other node processes) export MS_SCHED_HOST= # Set the Scheduler IP address to the Node 1 IP address export MS_SCHED_PORT=8118 # Set the Scheduler port export MS_ROLE=MS_WORKER # Set the startup process to the MS_WORKER role export MS_NODE_ID=$i # Set process id, optional pytest -s -v ./resnet50_distributed_training_gpu.py > worker_$i.log 2>&1 & # Start training script done ``` > The multi-machine task `MS_WORKER_NUM` should be the total number of Worker nodes in the cluster. > To keep the inter-node network connected, use the `telnet ` command to test whether this node is connected to the started Scheduler node. Execute on Node 1: ```bash ./run_gpu_cluster_1.sh /path/to/dataset/ ``` Execute on Node 2: ```bash ./run_gpu_cluster_2.sh /path/to/dataset/ ``` That is, you can perform 2-machine 8-card distributed training tasks. > The above startup scripts are consistent across `Ascend` and `CPU` hardware platforms, and only hardware-related code modifications such as `device_target` in the Python training scripts is performed, and we can execute dynamic cluster distributed training. ### 3. Execution Results The script will run in the background and the log file will be saved to the current directory. A total of 10 epochs are run, each of which has 234 steps. The results about the Loss part are saved in worker_*.log. After grep out the loss value, the example is as follows: ```text epoch: 1 step: 234, loss is 2.0084016 epoch: 2 step: 234, loss is 1.6407638 epoch: 3 step: 234, loss is 1.6164391 epoch: 4 step: 234, loss is 1.6838071 epoch: 5 step: 234, loss is 1.6320667 epoch: 6 step: 234, loss is 1.3098773 epoch: 7 step: 234, loss is 1.3515002 epoch: 8 step: 234, loss is 1.2943741 epoch: 9 step: 234, loss is 1.2316195 epoch: 10 step: 234, loss is 1.1533381 ``` ## Disaster Recovery Model training requires high reliability and serviceability of distributed training architecture. MindSpore supports disaster recovery under data parallelism, and the training tasks continue to be executed normally after the processes in the cluster (multiple Workers and 1 Scheduler) of the multi-card data parallel training scenario exit abnormally and are pulled up again. Scenario constraints: In graph mode, `MindData` is used for data sink mode training. Data parallel mode is turned on, and the Worker process is pulled up using the non-`OpenMPI` approach described above. In the above scenario, if a node is interrupted during the training process, it is guaranteed that the training can continue after pulling up the corresponding script of the corresponding process for the same environment variables (`MS_ENABLE_RECOVERY` and `MS_RECOVERY_PATH`), and the accuracy convergence is not affected. 1. Enable disaster recovery: Enabling disaster recovery through environment variables: ```bash export MS_ENABLE_RECOVERY=1 # Enable disaster recovery export MS_RECOVERY_PATH=/path/to/recovery/ # Configure the persistence path file ``` 2. Configure the checkpoint save interval. The sample is as follows: ```python from mindspore.train import ModelCheckpoint, CheckpointConfig ckptconfig = CheckpointConfig(save_checkpoint_steps=100, keep_checkpoint_max=5) ckpoint_cb = ModelCheckpoint(prefix='train', directory="./ckpt_of_rank_/"+str(get_rank()), config=ckptconfig) ``` Each worker is enabled to save checkpoint and use different paths (e.g., the directory in the above sample is set using rank id to ensure that the paths are not the same) to prevent conflicts in saving checkpoints with the same name. The checkpoint is used for abnormal process recovery and normal process rollback. The rollback of training means that each Worker in the cluster is restored to the state corresponding to the latest checkpoint, while the data side is also rolled back to the corresponding step, and then training continues. The interval between checkpoint saves is configurable, which determines the granularity of disaster recovery. The smaller the interval, the smaller the number of steps rolled back to the checkpoint saved last time, but frequent checkpoint saves may also affect training efficiency. The larger the interval, the opposite effect. keep_checkpoint_max is set to at least 2 (to prevent checkpoint save failures). > The running directory of sample: [distributed_training](https://gitee.com/mindspore/docs/tree/r2.1/docs/sample_code/distributed_training). The scripts involved are `run_gpu_cluster_recovery.sh`, `resnet50_distributed_training_gpu_recovery.py`, and `resnet.py`. The script contents `run_gpu_cluster_recovery.sh` are as follows: ```bash #!/bin/bash echo "==========================================" echo "Please run the script as: " echo "bash run_gpu_cluster_recovery.sh DATA_PATH" echo "For example: bash run_gpu_cluster_recovery.sh /path/dataset" echo "It is better to use the absolute path." echo "===========================================" DATA_PATH=$1 export DATA_PATH=${DATA_PATH} export MS_ENABLE_RECOVERY=1 # Enable disaster recovery export MS_RECOVERY_PATH=/path/to/recovery/ # Configure the persistence path file rm -rf device mkdir device cp ./resnet50_distributed_training_gpu_recovery.py ./resnet.py ./device cd ./device echo "start training" # Start 1 Scheduler process export MS_WORKER_NUM=8 # Set the number of Worker processes in the cluster to 8 export MS_SCHED_HOST=127.0.0.1 # Set the Scheduler IP address to the local loop address export MS_SCHED_PORT=8118 # Set Scheduler port export MS_ROLE=MS_SCHED # Set the started process to the MS_SCHED role export MS_NODE_ID=sched # Set Node ID as 'sched' pytest -s -v ./resnet50_distributed_training_gpu_recovery.py > scheduler.log 2>&1 & # Start 8 Worker training process in a loop for((i=0;i<8;i++)); do export MS_WORKER_NUM=8 # Set the number of Worker processes in the cluster to 8 export MS_SCHED_HOST=127.0.0.1 # Set the Scheduler IP address to the local loop address export MS_SCHED_PORT=8118 # Set Scheduler port export MS_ROLE=MS_WORKER # Set the started process to the MS_WORKER role export MS_NODE_ID=worker_$i # Set Node ID as 'worker_$i' pytest -s -v ./resnet50_distributed_training_gpu_recovery.py > worker_$i.log 2>&1 & done ``` Before starting Worker and Scheduler, you need to add relevant environment variables settings, such as IP and Port of Scheduler, and whether the role of the current process is Worker or Scheduler. Execute the following command to start a single-machine 8-card data parallel training ```bash bash run_gpu_cluster_recovery.sh /path/to/recovery/ ``` Distributed training starts, and if an abnormal case is encountered during training, such as a process quitting abnormally, and then restarting the corresponding process, the training process can be recovered: For example, if the Scheduler process abnormally exits during training, the following command can be executed to restart the Scheduler: ```bash export DATA_PATH=YOUR_DATA_PATH export MS_ENABLE_RECOVERY=1 # Enable disaster recovery export MS_RECOVERY_PATH=/path/to/recovery/ # Configure the persistence path file cd ./device # 启动1个Scheduler进程 export MS_WORKER_NUM=8 # Set the number of Worker processes in the cluster to 8 export MS_SCHED_HOST=127.0.0.1 # Set the Scheduler IP address to the local loop address export MS_SCHED_PORT=8118 # Set Scheduler port export MS_ROLE=MS_SCHED # Set the started process to the MS_SCHED role export MS_NODE_ID=sched # Set Node ID as 'sched' pytest -s -v ./resnet50_distributed_training_gpu_recovery.py > scheduler.log 2>&1 & ``` Worker and Scheduler cluster is automatically restored. Worker processes with abnormal exit are handled in a similar way (Note: Worker processes with abnormal exit need to wait 30s before pulling up to resume training. Before that, Scheduler rejects Worker with the same node id to register again in order to prevent network jitter and malicious registration). ## Security Authentication To support SSL security authentication between nodes/processes, to enable security authentication, configure `enable_ssl=True` via the Python API `mindspore.set_ps_context` (defaults to False when not passed in, indicating that SSL security authentication is not enabled). The config.json configuration file specified by config_file_path needs to add the following fields: ```json { "server_cert_path": "server.p12", "crl_path": "", "client_cert_path": "client.p12", "ca_cert_path": "ca.crt", "cipher_list": "ECDHE-R SA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384:DHE-DSS-AES256-GCM-SHA384:DHE-PSK-AES128-GCM-SHA256:DHE-PSK-AES256-GCM-SHA384:DHE-PSK-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-PSK-CHACHA20-POLY1305:DHE-RSA-AES128-CCM:DHE-RSA-AES256-CCM:DHE-RSA-CHACHA20-POLY1305:DHE-PSK-AES128-CCM:DHE-PSK-AES256-CCM:ECDHE-ECDSA-AES128-CCM:ECDHE-ECDSA-AES256-CCM:ECDHE-ECDSA-CHACHA20-POLY1305", "cert_expire_warning_time_in_day": 90 } ``` - server_cert_path: The path to the p12 file (SSL-specific certificate file) that contains the cipher text of the certificate and the secret key on the server side. - The file path to the revocation list (used to distinguish invalid untrusted certificates from valid trusted certificates). - client_cert_path: The client contains the path to the p12 file (SSL-specific certificate file) with the cipher text of the certificate and secret key. - ca_cert_path: The path to root certificate - cipher_list: Cipher suite (list of supported SSL encrypted types) - cert_expire_warning_time_in_da: The warning time of certificate expiration. The secret key in the p12 file is stored in cipher text, and the password needs to be passed in when starting. Please refer to the Python API [mindspore.set_ps_context](https://www.mindspore.cn/docs/en/r2.1/api_python/mindspore/mindspore.set_ps_context.html#mindspore.set_ps_context) for the `client_password` and `server_password` fields.