mindflow.data.boundary 源代码

# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
#pylint: disable=W0223
#pylint: disable=W0221
"""
Sampling data of boundary condition and initial condition.
"""
from __future__ import absolute_import
import numpy as np

from mindspore import log as logger
from .data_base import Data
from ..geometry.geometry_base import Geometry, SamplingConfig
from ..utils.check_func import check_param_type


class Boundary(Data):
    """
    Base class of boundary condition and initial condition.

    Args:
        geometry (Geometry): specifies geometry information of boundary condition and initial condition.

    Raises:
        ValueError: if `sampling_config` of geometry is ``None``.
        TypeError: if `geometry` is not an instance of Geometry.

    Supported Platforms:
        ``Ascend`` ``GPU``

    Examples:
        >>> from mindflow.geometry import generate_sampling_config, Geometry
        >>> from mindflow.data import Boundary
        >>> geometry_config = dict({'BC' : dict({'random_sampling' : True, 'size' : 100, 'sampler' : 'uniform',})})
        >>> sampling_config = generate_sampling_config(geometry_config)
        >>> geom = Geometry("geom", 1, 0.0, 1.0, sampling_config=sampling_config)
        >>> boundary = Boundary(geometry=geom)
    """
    def __init__(self, geometry):
        check_param_type(geometry, "geometry", data_type=Geometry)
        self.geometry = geometry
        check_param_type(geometry.sampling_config, geometry.name, data_type=SamplingConfig)

        self.data = None
        self.data_size = None
        self.batch_size = 1
        self.shuffle = False
        self.batched_data_size = None
        self.columns_list = None
        self._bound_index = None
        self._bound_index_num = 0
        super(Boundary, self).__init__()

    def _get_sampling_data(self, geom_type="BC"):
        """get sampling data"""
        sample_data = self.geometry.sampling(geom_type=geom_type)
        if not isinstance(sample_data, tuple):
            sample_data = (sample_data,)
        logger.info("Get {} sample_data size: {}".format(geom_type, len(sample_data[0])))
        return sample_data, self.geometry.columns_dict[geom_type]

    def _initialize(self, batch_size=1, shuffle=False, geom_type="BC"):
        """initialization: sampling and set attrs."""
        data, self.columns_list = self._get_sampling_data(geom_type=geom_type)
        if not isinstance(data, tuple):
            data = (data,)
        self.data = data
        self.data_size = len(self.data[0])
        self.batch_size = batch_size
        if batch_size > self.data_size:
            raise ValueError("If prebatch data, batch_size: {} should not be larger than data size: {}.".format(
                batch_size, self.data_size
            ))
        self.batched_data_size = self.data_size // batch_size
        self.shuffle = shuffle
        self._bound_index = np.arange(self.data_size)
        logger.info("Get {} dataset: {}, columns: {}, size: {}, batched_size: {}, shuffle: {}".format(
            geom_type, self.name, self.columns_list, self.data_size, self.batched_data_size, self.shuffle))
        return data

    def _get_index_when_sample_iter(self, index, geom_type="BC"):
        if self._bound_index_num == self.batched_data_size:
            self.data = self._initialize(self.batch_size, self.shuffle, geom_type=geom_type)
            self._bound_index_num = 0
        index = self._bound_index_num
        self._bound_index_num += 1
        return index

    def _get_index_when_sample_all(self, index):
        data_size = self.__len__()
        if (self._random_merge or self.shuffle) and index % data_size == 0:
            self._bound_index = np.random.permutation(self.data_size)
        index = index % data_size if index >= data_size else index
        return index

    def __len__(self):
        return self.batched_data_size

    def _get_item(self, index):
        col_data = None
        for i in range(len(self.columns_list)):
            temp_data = self.data[i][self._bound_index[index]] if self.batch_size == 1 else \
                        self.data[i][self._bound_index[index * self.batch_size : (index + 1) * self.batch_size]]
            col_data = (temp_data,) if col_data is None else col_data + (temp_data,)
        return col_data


[文档]class BoundaryBC(Boundary): """ Sampling data of boundary condition. Args: geometry (Geometry): specifies geometry information of boundary condition. Derived from mindflow.geometry, the geometry could be cube or disk, etc. Raises: ValueError: if sampling_config.bc of `geometry` is ``None``. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> from mindflow.geometry import generate_sampling_config, Geometry >>> from mindflow.data import BoundaryBC >>> geometry_config = dict({'BC' : dict({'random_sampling' : True, 'size' : 100, 'sampler' : 'uniform',})}) >>> sampling_config = generate_sampling_config(geometry_config) >>> geom = Geometry("geom", 1, 0.0, 1.0, sampling_config=sampling_config) >>> boundary_bc = BoundaryBC(geometry=geom) """ def __init__(self, geometry): super(BoundaryBC, self).__init__(geometry) self.sampling_config = self.geometry.sampling_config.bc if not self.sampling_config: raise ValueError("BC info for the current geometry: {} should not be None".format(geometry.name)) self.name = geometry.name + "_BC" self.constraint_type = "BC" self._random_merge = self.sampling_config.random_merge def __getitem__(self, bc_index): if not self.data: self._initialization() if self.sampling_config.random_sampling: bc_index = self._get_index_when_sample_iter(bc_index, geom_type="BC") else: bc_index = self._get_index_when_sample_all(bc_index) return self._get_item(bc_index) def _initialization(self, batch_size=1, shuffle=False): """initialization: sampling and set attrs.""" return self._initialize(batch_size=batch_size, shuffle=shuffle, geom_type="BC")
[文档]class BoundaryIC(Boundary): """ Sampling data of initial condition. Args: geometry (Geometry): specifies geometry information of initial condition. Raises: ValueError: if sampling_config.ic of geometry is ``None``. Supported Platforms: ``Ascend`` ``GPU`` Examples: >>> from mindflow.geometry import generate_sampling_config, Geometry >>> from mindflow.data import BoundaryIC >>> geometry_config = dict({'IC' : dict({'random_sampling' : True, 'size' : 100, 'sampler' : 'uniform',})}) >>> sampling_config = generate_sampling_config(geometry_config) >>> geom = Geometry("geom", 1, 0.0, 1.0, sampling_config=sampling_config) >>> boundary_ic = BoundaryIC(geometry=geom) """ def __init__(self, geometry): super(BoundaryIC, self).__init__(geometry) self.sampling_config = self.geometry.sampling_config.ic if not self.sampling_config: raise ValueError("IC info for the current geometry: {} should not be None".format(geometry.name)) self.name = geometry.name + "_IC" self.constraint_type = "IC" self._random_merge = self.sampling_config.random_merge def _initialization(self, batch_size=1, shuffle=False): """initialization: sampling and set attrs.""" return self._initialize(batch_size=batch_size, shuffle=shuffle, geom_type="IC") def __getitem__(self, ic_index): if not self.data: self._initialization() if self.sampling_config.random_sampling: ic_index = self._get_index_when_sample_iter(ic_index, geom_type="IC") else: ic_index = self._get_index_when_sample_all(ic_index) return self._get_item(ic_index)