mindpandas.channel

class mindpandas.channel.DataSender(address, namespace='default', num_shards=1, dataset_name='dataset', full_batch=False, max_queue_size=10)

channel的发送方(输入端),通过channel发送新对象。

参数:
  • address (str) - 当前sender运行的节点的ip地址。

  • namespace (str, 可选) - channel所属的命名空间。默认值:'default' ,sender将在命名空间 default 中运行。不同命名空间的DataSender和DataReceiver不能相互连接。

  • num_shards (int, 可选) - 指定将数据划分为多少个分片。默认值:1

  • dataset_name (str, 可选) - 数据集的名称。默认值:'dateset'

  • full_batch (bool, 可选) - 如果为 True,则每个分片将获得sender发送的完整数据。否则,每个分片只能获取部分数据。默认值:False

  • max_queue_size (int, 可选) - 队列中能够缓存的最大元素数量。默认值:10

异常:
  • ValueError - 当 num_shards 为无效值时。

说明

分布式执行引擎必须提前启动。

send(obj)

通过channel发送对象。

参数:
  • obj (Union[numpy.ndarray, list, mindpandas.DataFrame]) - 要发送的对象。

异常:
  • TypeError - 如果 obj 的类型不合法。

  • ValueError - 如果 obj 的长度不是正整数或不能被分片数整除。

property num_shards

返回当前channel的 num_shards

property full_batch

返回 full_batch 的值。

get_queue(shard_id=None)

返回与指定的 shard_id 对应的数据集中尚未消费的对象引用。

参数:
  • shard_id (int, 可选) - 请求分片的id。默认值:None,将返回所有分片。

返回:

List,存储分片中数据的引用的列表。

class mindpandas.channel.DataReceiver(address, namespace='default', shard_id=0, dataset_name='dataset')

从channel接收数据的类。负责接收来自channel的新对象。

参数:
  • address (str) - 当前receiver运行的节点的ip地址。

  • namespace (str, 可选) - channel所属的命名空间。默认值:'default' ,receiver将在命名空间 default 中运行。不同命名空间的DataSender和DataReceiver不能相互连接。

  • shard_id (int, 可选) - 指定当前receiver接收数据集的哪个分片。默认值:0,receiver将从id为0的分片获取数据。

  • dataset_name (str, 可选) - 数据集的名称。默认值:'dataset'

说明

必须提前启动分布式执行引擎,并且提前初始化DataSender。要与正确的DataSender配对,namespacedataset_name 必须与DataSender相同。

recv()

通过channel获取数据。

返回:

object,分片中最近没有被消费的对象。

异常:
  • ValueError - 当前receiver的 shard_id 无效时。

property shard_id

返回当前receiver的 shard_id

property num_shards

返回当前channel的 num_shards