AI数据框架大横评之并行处理
AI数据框架大横评之并行处理
前言
开始本文的内容前,先简单回顾一下本系列的前几篇文章:
第一篇,我们简要对比了当前主流AI数据框架的架构设计,从中可以略微看出各家框架的主要设计理念和应用场景。
第二篇,我们简要对比了当前主流AI数据框架的数据加载方式。
第三篇,我们简要对比了当前主流AI数据框架的数据处理方式。
第四篇,我们简要对比了当前主流AI数据框架的采样器。
建议大家先阅读以上几篇文章,再开始下面的阅读。
并行处理
随着AI芯片算力的不断提高,网络对数据预处理效率的要求也随之升高,如果数据生产的速度跟不上网络消耗的速率,数据处理将成为整个网络训练性能的瓶颈。
为了充分利用Host资源,各家AI数据框架都提供了并行处理的功能,能够充分利用CPU多核多线程的能力,提高数据处理的整体速度。
MindSpore
前文已经提到,MindSpore数据框架整体是一个多节点异步流水线的结构,这里的异步指的就是并发处理。MindSpore为数据加载、Map和Batch等比较耗时的操作提供了并发处理的能力。
数据加载接口以 mindspore.dataset.GeneratorDataset 为例:
class mindspore.dataset.GeneratorDataset(source, column_names=None, column_types=None, schema=None, num_samples=None,
num_parallel_workers=1, shuffle=None, sampler=None, num_shards=None,
shard_id=None, python_multiprocessing=True, max_rowsize=None)
Map接口:
mindspore.dataset.Dataset.map(operations, input_columns=None, output_columns=None, column_order=None,
num_parallel_workers=None, python_multiprocessing=False, **kwargs)
Batch接口:
mindspore.dataset.Dataset.batch(batch_size, drop_remainder=False, num_parallel_workers=None,
python_multiprocessing=False, **kwargs)
可通过以上接口的 num_parallel_workers 参数设置并行度,默认值为8。此时将会创建对应数量的线程,并发执行加载或处理逻辑,提升吞吐率。
我们知道基于Cython的Python内部存在全局解释器锁(GIL),所以即使创建了多个线程,同一时刻也只会有一个线程能够持有GIL而真正运行。所以当使用上述接口执行Python自定义逻辑时,最好通过 python_multiprocessing 参数打开多进程的功能,此时将会创建进程而不是线程来并发执行加载或处理逻辑,从而规避GIL的影响。
下述代码编写了自定义数据加载逻辑和处理逻辑,并启用了多进程并发处理:
import mindspore.dataset as ds
dataset = ds.GeneratorDataset(MyDataset(), column_names=["data", "lable"],
num_parallel_workers=8, python_multiprocessing=True)
dataset = dataset.map(process_data, input_columns=["data"], num_parallel_workers=8, python_multiprocessing=True)
PyTorch
PyTorch也具有并发处理的能力,但与MindSpore不同,由于其不存在多节点流水线的结构,数据的加载和处理都在自定义类中完成,所以其并发的粒度不再是节点,而是整个处理过程。
用户可以通过 torch.utils.data.DataLoader 接口的 num_workers 参数设置数据处理的并发度,默认值为0,表示不使用并发加载,数据处理全部由主进程执行。
class torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=None, sampler=None, batch_sampler=None, num_workers=0,
collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None,
multiprocessing_context=None, generator=None, *, prefetch_factor=None,
persistent_workers=False, pin_memory_device='')
由于整个 torch.utils.data.DataLoader 执行的都为Python逻辑,所以其并发全部基于多进程,不存在多线程处理。
下述代码使用 DataLoader 并发加载数据集:
from torch.utils.data import DataLoader
ataloader = DataLoader(MyDataset(), batch_size=32, num_workers=8)
TensorFlow
TensorFlow与MindSpore一样都为多节点异步流水线的结构,其对数据加载、Map、Batch和Interleave接口提供了并发处理的能力。
数据加载接口以 tf.data.TFRecordDataset 为例:
class tf.data.TFRecordDataset(
filenames,
compression_type=None,
buffer_size=None,
num_parallel_reads=None,
name=None
)
Map接口:
tf.data.Dataset.map(
map_func, num_parallel_calls=None, deterministic=None, name=None
)
Batch接口:
tf.data.Dataset.batch(
batch_size,
drop_remainder=False,
num_parallel_calls=None,
deterministic=None,
name=None
)
Interleave接口:
tf.data.Dataset.interleave(
map_func,
cycle_length=None,
block_length=None,
num_parallel_calls=None,
deterministic=None,
name=None
)
可通过数据加载接口的 num_parallel_reads 参数以及Map、Batch、Interleave接口的 num_parallel_calls 参数设置并行度,默认值为None,表示不使能并发。此时将会创建对应数量的线程,并发执行加载或处理逻辑,提升吞吐率。
可能由于TensorFlow中自定义的Python逻辑并不多,所以其未提供多进程处理的能力,如果用户通过 tf.data.Dataset.from_generator 或 tf.data.Dataset.map 执行Python函数,其效果可能会受GIL的影响。
下述代码使用 TFRecordDataset 并发加载数据集并处理:
import tensorflow as tf
dataset = tf.data.TFRecordDataset([example_path], num_parallel_reads=8)
dataset = dataset.map(decode_fn, num_parallel_reads=8)
值得一提的是,TensorFlow还提供了自动参数调优的功能,用户可以直接将并发度设置为 tf.data.AUTOTUNE ,此时并发度将在运行中动态调整为最佳值,免除了用户调优的麻烦。