代码
AI数据框架大横评(3)

AI数据框架大横评(3)

AI数据框架大横评(3)

前言

开始本文的内容前,先简单回顾一下本系列的前两篇文章:

首先,我们简要对比了当前主流AI数据框架的架构设计,从中可以略微看出各家框架的主要设计理念和应用场景。

AI数据框架大横评:https://www.hiascend.com/developer/blog/details/0222150949518615069然后,我们简要对比了当前主流AI数据框架的数据加载方式。

AI数据框架大横评(2):https://www.hiascend.com/developer/blog/details/0286156408632279318

建议大家先阅读以上两篇文章,再开始下面的阅读。

数据处理

当原始数据从存储设备加载到内存后,往往还需要进行一系列的处理,才能传递到网络进行训练。

数据处理是对已加载的数据进行加工整理,形成适合网络学习的样式,它是模型训练前必不可少的阶段。

针对不同的目的,数据处理的手段各不一样。当原始数据杂乱无章时,需要对其进行去重、清洗;当原始数据量偏少,或是分布不均匀时,需要对齐进行增广;当原始数据特征难以学习时,需要对其进行特征变换、归一化。

除此之外,不同领域的任务,如CV、NLP、Audio等,也都具有各自不同的经典处理方法,如图像的放缩、裁切,文本的分词、向量化,音频的频谱变换、滤波等。

作为AI数据框架,很难全部满足用户差异化的处理需求,所以往往只提供了部分基础的数据处理接口,并开放自定义的能力,方便用户根据自己的需要编写个性化的数据处理方法。

MindSpore

MindSpore提供了一系列通用的数据处理API,本文将主要介绍较为常用的Map和Batch操作,其余API可自行参考官方文档。

1)Map

Map操作用于将指定的一系列数据变换(Transforms)依次作用于每个样本。

MindSpore提供了一些常用的数据变换API,方便用户一键式使用,避免书写冗长的处理逻辑。

以处理MNIST手写字识别数据集为例,首先按照上一篇文章的方法加载数据集。

import mindspore.dataset as ds

mnist_dataset_dir = "/path/to/mnist_dataset_directory"

dataset = ds.MnistDataset(dataset_dir=mnist_dataset_dir)

然后即可使用Map指定想要对数据集中图像执行的数据变换。

下列代码首先定义了一个随机裁切变换,并指定输出的图像大小为(5,5)。然后使用Map操作将该变换作用于数据集中的“image”列。这样一来,MNIST数据集中的每张图像都将在随机位置被裁切出一个(5,5)的子图像,用于后续处理。

import mindspore.dataset.vision as vision

random_crop = vision.RandomCrop(5)

dataset = dataset.map(random_crop, input_columns=["image"])

如果官方提供的API无法满足需要,用户也可编写自定义处理逻辑,并通过Map执行。

下列代码自定义了一个归一化变换,用于将MNIST数据集图像的像素值从 [0, 255) 归一化到 [0, 1) ,并通过Map执行。

def normalize(image):

return image / 255

dataset = dataset.map(normalize, input_columns=["image"])

2)Batch

Batch操作用于将连续batch_size个样本组合为1个批数据,有利于更好地利用Device的并行运算能力。

下列使用Batch操作将MNIST数据集中连续的32个样本组合为1个批数据。若原始输入数据的shape为(28,28,3),则Batch后的数据的shape将变为(32,28,28,3)。

dataset = dataset.batch(32)

需要注意的是,若输入数据的shape不固定,则无法直接拼接,此时需要用户自行定义逻辑进行填充或截断,使得同一批内的样本的shape保持一致。

下列自定义了一个截断函数,遍历获取到的连续batch_size个样本组成的列表,将其中的每个图片截取为(5,5,3)大小,再将样本的列表组装成numpy类型并返回。

def truncate(images, labels, batch_info):

truncate_images = []

for image in images:

truncate_images.append(image[:5, :5, :])

return np.array(truncate_images), np.array(labels)

dataset = dataset.batch(32, per_batch_map=truncate)

PyTorch

PyTorch将所有数据处理功能都集成到DataLoader接口之上,对于未提供的功能,则需用户编写自定义逻辑实现。但为了对比方便,下文依旧从功能的角度讲解PyTorch的用法。

Transforms

PyTorch并未将数据变换视为一个单独的步骤,而是将其融合到了数据加载中,即在用户自定义数据加载类时,应先完成对数据的变换,再通过 __getitem__ 或 __next__ 返回。

下列代码在编写自定义数据加载类时,除了完成对图片文件的读取外,还进行了随机裁切和归一化变换。

其中随机裁切使用了torchvision官方提供的接口,这些接口底层基于torch的Tensor运算实现,具有更好的性能,更为推荐使用。

如果官方提供的API无法满足需要,用户也可参考其中归一化函数的写法,自行编写处理逻辑。

import torch

from torchvision.io import decode_image

from torchvision.transforms import v2

def normalize(image):

return image / 255

class ImageDataset(torch.utils.data.Dataset):

def __init__(self, image_dir):

self.files = [os.path.join(image_dir, file) for file in os.listdir(image_dir)]

def __getitem__(self, index):

image = decode_image(self.files[index], mode="RGB")

croped_image = v2.RandomCrop(5)(image)

normalized_image = normalize(croped_image)

return normalized_image

def __len__(self):

return len(self.files)

需要特殊说明的是,在使用torch官方预封装的数据加载API时,由于加载逻辑无法修改,需要通过 transform 和 target_transform 参数传递需要执行的数据变换。

下列代码使用torchvision官方提供的API加载MNIST数据集,并通过 transform 参数指定对图像执行随机裁切变换,通过 target_transform 参数指定对标签执行类型转换变换。

import torchvision

from torchvision.transforms import v2

mnist_dataset_dir = "/path/to/mnist_dataset_directory"

image_transform = v2.RandomCrop(5)

label_transform = v2.ToDtype(torch.uint8)

dataset = torchvision.datasets.MNIST(root=mnist_dataset_dir,

transform=image_transform,

target_transform=label_transform)

Batch

PyTorch的Batch操作通过 DataLoader 实现,当前有两种执行Batch的方式。

下列代码通过 DataLoader 的 batch_size 参数指定了将连续的32个样本组合为1个批数据,这也是Batch操作最为常见的用法。

from torch.utils.data import DataLoader

from torchvision.datasets import MNIST

dataset = MNIST(root="/path/to/mnist_dataset_directory")

dataloader = DataLoader(dataset, batch_size=32)

除此之外,用户也可以通过自行编写采样器逻辑,来指定如何组合批数据。

下列代码编写了一个Batch Sampler,通过 __iter__ 函数一次返回一批索引值,DataLoader 将依次读取对应样本,然后将其组合为1个批数据。

from torch.utils.data import DataLoader, Sampler

from torchvision.datasets import MNIST

class BatchSampler(Sampler):

def __init__(self, data_size, batch_size):

self.data_size = data_size

self.batch_size = batch_size

def __len__(self):

return self.data_size // self.batch_size

def __iter__(self):

batch_indices = []

for index in range(self.data_size):

batch_indices.append(index)

if len(batch_indices) == self.batch_size:

yield batch_indices

batch_indices = []

dataset = MNIST(root="/path/to/mnist_dataset_directory")

batch_sampler = BatchSampler(data_size=60000, batch_size=32)

dataloader = DataLoader(dataset, batch_sampler=batch_sampler)

同样,若输入数据间的shape不统一,是无法直接组合的,此时需要用户自行编写组合批数据的逻辑。

下列代码加载了一批具有动态shape的数据,然后通过自定义 collate_fn ,将特征列沿着最长维进行填充,并组合为1个批数据返回。

from torch.utils.data import DataLoader

from torch.nn.utils.rnn import pad_sequence

def collate_fn(data):

features, targets = zip(*data)

features = pad_sequence(features, batch_first=True)

targets = torch.stack(targets)

return features, targets

dataset = DynamicShapeDataset()

dataloader = DataLoader(dataset, batch_size=32, collate_fn=collate_fn)

TensorFlow

TensorFlow的设计与MindSpore较为相似,当用户定义好数据集对象后,即可通过对象的方法执行各种数据处理操作。

Map

TensorFlow通过Map将指定的数据变换作用于每个样本。但其并未提供预置的数据变换,需要用户自行编写自定义逻辑。

下列代码定义了一个包含两个字符串的数据集,然后通过自定义函数将字符串中的字符转为大写。

import tensorflow as tf

def upper_case_fn(data):

return data.numpy().decode('utf-8').upper()

dataset = tf.data.Dataset.from_tensor_slices(['hello', 'world'])

dataset = dataset.map(lambda data: tf.py_function(func=upper_case_fn,

inp=[data], Tout=tf.string))

需要注意的是,由于TensorFlow会将自定义函数作为图来执行,当前并非所有的Python语法都支持自动转换为图,所以往往需要通过 tf.numpy_function 和 tf.py_function 来包装Python代码。

Batch

TensorFlow提供了Batch操作来执行基础的批处理。

下列代码实现了一个简单的数据集,然后通过Batch操作将连续的3个样本组合为1个批数据。

import tensorflow as tf

dataset = tf.data.Dataset.range(8)

dataset = dataset.batch(3)

但当用户输入数据的shape不固定时,则需要改用 padded_batch 或 ragged_batch 操作。

下列代码定义了一个变长数据集,然后通过 padded_batch 操作,首先使用指定的 padding_values 将每条数据的长度填充至 padded_shapes,然后再组合为批数据。

import tensorflow as tf

dataset = tf.data.Dataset.range(1, 5, output_type=tf.int32) # [[1], [2], [3], [4]]

dataset = dataset.map(lambda x: tf.fill([x], x))) # [[1], [2, 2], [3, 3, 3], [4, 4, 4, 4]]

dataset = dataset.padded_batch(2, padded_shapes=5, # [[1, 0, 0, 0, 0], [2, 2, 0, 0, 0]]

padding_values=0) # [[3, 3, 3, 0, 0], [4, 4, 4, 4, 0]]

此外,TensorFlow还提供了一种特殊的Tensor结构,来表示变长数据,即 RaggedTensor。

下列代码定义了一个变长数据集,然后通过 ragged_batch 操作将连续的 batch_size 条样本构造为1个RaggedTensor。

import tensorflow as tf

dataset = tf.data.Dataset.range(1, 5) # [[1], [2], [3], [4]]

dataset = dataset.map(lambda x: tf.range(x)) # [[0], [0, 1], [0, 1, 2], [0, 1, 2, 3]]

dataset = dataset.ragged_batch(2) #

除了上述列举的操作外,TensorFlow还提供了以下几种Batch,本文将不再详细说明。

sparse_batch:将变长数据组合为另一种稀疏的Tensor结构,即 SparseTensor。

unbatch:将已经组装的批数据重新拆分为单个样本。

rebatch: 将已经组装的批数据先拆分为单个样本,再根据新指定的参数重新组合。