代码
AI数据框架大横评之并行处理

AI数据框架大横评之并行处理

AI数据框架大横评之并行处理

前言

开始本文的内容前,先简单回顾一下本系列的前几篇文章:

第一篇,我们简要对比了当前主流AI数据框架的架构设计,从中可以略微看出各家框架的主要设计理念和应用场景。

AI数据框架大横评之架构设计

第二篇,我们简要对比了当前主流AI数据框架的数据加载方式。

AI数据框架大横评之数据加载

第三篇,我们简要对比了当前主流AI数据框架的数据处理方式。

AI数据框架大横评之数据处理

第四篇,我们简要对比了当前主流AI数据框架的采样器。

AI数据框架大横评之采样器

建议大家先阅读以上几篇文章,再开始下面的阅读。

并行处理

随着AI芯片算力的不断提高,网络对数据预处理效率的要求也随之升高,如果数据生产的速度跟不上网络消耗的速率,数据处理将成为整个网络训练性能的瓶颈。

为了充分利用Host资源,各家AI数据框架都提供了并行处理的功能,能够充分利用CPU多核多线程的能力,提高数据处理的整体速度。

MindSpore

前文已经提到,MindSpore数据框架整体是一个多节点异步流水线的结构,这里的异步指的就是并发处理。MindSpore为数据加载、Map和Batch等比较耗时的操作提供了并发处理的能力。

数据加载接口以 mindspore.dataset.GeneratorDataset 为例:

class mindspore.dataset.GeneratorDataset(source, column_names=None, column_types=None, schema=None, num_samples=None,
                                         num_parallel_workers=1, shuffle=None, sampler=None, num_shards=None,
                                         shard_id=None, python_multiprocessing=True, max_rowsize=None)

Map接口:

mindspore.dataset.Dataset.map(operations, input_columns=None, output_columns=None, column_order=None,
                              num_parallel_workers=None, python_multiprocessing=False, **kwargs)

Batch接口:

mindspore.dataset.Dataset.batch(batch_size, drop_remainder=False, num_parallel_workers=None, 
                                python_multiprocessing=False, **kwargs)

可通过以上接口的 num_parallel_workers 参数设置并行度,默认值为8。此时将会创建对应数量的线程,并发执行加载或处理逻辑,提升吞吐率。

我们知道基于Cython的Python内部存在全局解释器锁(GIL),所以即使创建了多个线程,同一时刻也只会有一个线程能够持有GIL而真正运行。所以当使用上述接口执行Python自定义逻辑时,最好通过 python_multiprocessing 参数打开多进程的功能,此时将会创建进程而不是线程来并发执行加载或处理逻辑,从而规避GIL的影响。

下述代码编写了自定义数据加载逻辑和处理逻辑,并启用了多进程并发处理:

import mindspore.dataset as ds

dataset = ds.GeneratorDataset(MyDataset(), column_names=["data", "lable"],
                              num_parallel_workers=8, python_multiprocessing=True)
dataset = dataset.map(process_data, input_columns=["data"], num_parallel_workers=8, python_multiprocessing=True)

PyTorch

PyTorch也具有并发处理的能力,但与MindSpore不同,由于其不存在多节点流水线的结构,数据的加载和处理都在自定义类中完成,所以其并发的粒度不再是节点,而是整个处理过程。

用户可以通过 torch.utils.data.DataLoader 接口的 num_workers 参数设置数据处理的并发度,默认值为0,表示不使用并发加载,数据处理全部由主进程执行。

class torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=None, sampler=None, batch_sampler=None, num_workers=0,
                                  collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None,
                                  multiprocessing_context=None, generator=None, *, prefetch_factor=None,
                                  persistent_workers=False, pin_memory_device='')

由于整个 torch.utils.data.DataLoader 执行的都为Python逻辑,所以其并发全部基于多进程,不存在多线程处理。

下述代码使用 DataLoader 并发加载数据集:

from torch.utils.data import DataLoader

ataloader = DataLoader(MyDataset(), batch_size=32, num_workers=8)

TensorFlow

TensorFlow与MindSpore一样都为多节点异步流水线的结构,其对数据加载、Map、Batch和Interleave接口提供了并发处理的能力。

数据加载接口以 tf.data.TFRecordDataset 为例:

class tf.data.TFRecordDataset(
    filenames,
    compression_type=None,
    buffer_size=None,
    num_parallel_reads=None,
    name=None
)

Map接口:

tf.data.Dataset.map(
    map_func, num_parallel_calls=None, deterministic=None, name=None
)

Batch接口:

tf.data.Dataset.batch(
    batch_size,
    drop_remainder=False,
    num_parallel_calls=None,
    deterministic=None,
    name=None
)

Interleave接口:

tf.data.Dataset.interleave(
    map_func,
    cycle_length=None,
    block_length=None,
    num_parallel_calls=None,
    deterministic=None,
    name=None
)

可通过数据加载接口的 num_parallel_reads 参数以及Map、Batch、Interleave接口的 num_parallel_calls 参数设置并行度,默认值为None,表示不使能并发。此时将会创建对应数量的线程,并发执行加载或处理逻辑,提升吞吐率。

可能由于TensorFlow中自定义的Python逻辑并不多,所以其未提供多进程处理的能力,如果用户通过 tf.data.Dataset.from_generatortf.data.Dataset.map 执行Python函数,其效果可能会受GIL的影响。

下述代码使用 TFRecordDataset 并发加载数据集并处理:

import tensorflow as tf

dataset = tf.data.TFRecordDataset([example_path], num_parallel_reads=8)
dataset = dataset.map(decode_fn, num_parallel_reads=8)

值得一提的是,TensorFlow还提供了自动参数调优的功能,用户可以直接将并发度设置为 tf.data.AUTOTUNE ,此时并发度将在运行中动态调整为最佳值,免除了用户调优的麻烦。