Source code for mindspore.nn.layer.normalization

# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""normalization"""
from mindspore.ops import operations as P
from mindspore.ops import functional as F
from mindspore.common.parameter import Parameter
from mindspore.common.initializer import initializer
from mindspore.ops.primitive import constexpr
from mindspore.common.tensor import Tensor
import mindspore.common.dtype as mstype
import mindspore.context as context
from mindspore._checkparam import check_bool, check_typename
from mindspore._extends import cell_attr_register
from mindspore.communication.management import get_group_size, get_rank
from mindspore.communication import management
from mindspore._checkparam import check_int_positive
from ..cell import Cell


__all__ = ['BatchNorm1d', 'BatchNorm2d', 'LayerNorm', 'GroupNorm', 'GlobalBatchNorm']

class _BatchNorm(Cell):
    """Batch Normalization base class."""
    @cell_attr_register
    def __init__(self,
                 num_features,
                 eps=1e-5,
                 momentum=0.9,
                 affine=True,
                 gamma_init='ones',
                 beta_init='zeros',
                 moving_mean_init='zeros',
                 moving_var_init='ones',
                 use_batch_statistics=True,
                 device_num_each_group=1):
        super(_BatchNorm, self).__init__()
        if num_features < 1:
            raise ValueError("num_features must be at least 1")

        if momentum < 0 or momentum > 1:
            raise ValueError("momentum should be a number in range [0, 1], but got {}".format(momentum))

        self.use_batch_statistics = use_batch_statistics
        self.num_features = num_features
        self.eps = eps
        self.moving_mean = Parameter(initializer(
            moving_mean_init, num_features), name="mean", requires_grad=False)
        self.moving_variance = Parameter(initializer(
            moving_var_init, num_features), name="variance", requires_grad=False)
        self.gamma = Parameter(initializer(
            gamma_init, num_features), name="gamma", requires_grad=affine)
        self.beta = Parameter(initializer(
            beta_init, num_features), name="beta", requires_grad=affine)
        self.group = check_int_positive(device_num_each_group)
        self.is_global = False
        if self.group != 1:
            self.rank_id = get_rank()
            self.rank_size = get_group_size()
            self.device_list = [i for i in range(0, self.rank_size)]
            self.rank_list = self.list_group(self.device_list, self.group)
            self.rank_list_idx = len(self.rank_list)
            for i in range(self.rank_list_idx):
                if self.rank_id in self.rank_list[i] and self.group != 1:
                    self.is_global = True
                    management.create_group('group' + str(i), self.rank_list[i])
                    self.all_reduce = P.AllReduce(P.ReduceOp.SUM, 'group' + str(i)).add_prim_attr('fusion', 1)
        self.shape = P.Shape()
        self.reduce_mean = P.ReduceMean(keep_dims=True)
        self.square = P.Square()
        self.sqrt = P.Sqrt()
        self.cast = P.Cast()
        self.dtype = P.DType()
        self.reshape = P.Reshape()
        self.is_ascend = context.get_context("device_target") == "Ascend"
        self.is_graph_mode = context.get_context("mode") == context.GRAPH_MODE

        if context.get_context("enable_ge"):
            self.is_ge_backend = True
            self.momentum = Tensor(1.0 - momentum, mstype.float32)
        else:
            self.is_ge_backend = False
            self.momentum = 1.0 - momentum
        if self.is_graph_mode and (self.is_ge_backend or self.is_ascend):
            self.bn_train = P.BatchNorm(is_training=True,
                                        epsilon=self.eps)
        else:
            self.bn_train = P.FusedBatchNorm(mode=1,
                                             epsilon=self.eps,
                                             momentum=self.momentum)
        self.bn_infer = P.BatchNorm(is_training=False, epsilon=self.eps)

        data_parallel_strategy = ((1,), (1,))
        data_parallel_strategy_one = ((1,), ())
        self.sub_mean = P.Sub().set_strategy(data_parallel_strategy)
        self.sub_var = P.Sub().set_strategy(data_parallel_strategy)
        self.mul_mean = P.Mul().set_strategy(data_parallel_strategy_one)
        self.mul_var = P.Mul().set_strategy(data_parallel_strategy_one)
        self.assign_sub_mean = P.AssignSub().set_strategy(data_parallel_strategy)
        self.assign_sub_var = P.AssignSub().set_strategy(data_parallel_strategy)

    def _check_data_dim(self, x):
        raise NotImplementedError

    def list_group(self, world_rank, group_size):
        if group_size > get_group_size():
            raise ValueError("group size can not be greater than local rank size, group size is {}, "
                             "local_rank_size is {}".format(group_size, get_group_size()))
        if len(world_rank) % group_size != 0:
            raise ValueError("please make your group size correct.")
        world_rank_list = zip(*(iter(world_rank),) *group_size)
        group_list = [list(i) for i in world_rank_list]
        return group_list



    def _global_sync(self, x, axes, re_shape):
        """calculate global batch normalization output"""
        x_mean = self.reduce_mean(x, axes)
        x_mean_square = self.reduce_mean(self.square(x), axes)
        global_batch_mean = self.all_reduce(x_mean) / self.group
        global_batch_mean_square = self.all_reduce(x_mean_square) / self.group
        global_mean = global_batch_mean
        global_var = global_batch_mean_square - self.square(global_mean)
        var_sqrt = self.sqrt(global_var + self.eps)
        mean_first = (x - global_mean) / var_sqrt
        y = mean_first * self.reshape(self.gamma, re_shape) + self.reshape(self.beta, re_shape)

        mean_sub = self.sub_mean(self.reshape(self.moving_mean, re_shape), global_mean)
        tmp_mean = self.mul_mean(mean_sub, self.cast(self.momentum, self.dtype(mean_sub)))
        mean_sub2 = self.sub_var(self.reshape(self.moving_mean, re_shape), global_var)
        tmp_variance = self.mul_var(mean_sub2, self.cast(self.momentum, self.dtype(mean_sub2)))
        y = F.depend(y, self.assign_sub_mean(self.moving_mean, self.reshape(tmp_mean, self.shape(self.moving_mean))))
        y = F.depend(y, self.assign_sub_var(self.moving_variance,
                                            self.reshape(tmp_variance, self.shape(self.moving_variance))))
        return y

    def construct(self, x):
        if self.training and self.use_batch_statistics:
            if self.is_ge_backend and self.is_global:
                axes, re_shape = _shape_infer(F.shape(x), self.num_features)
                y = self._global_sync(x, axes, re_shape)
            elif self.is_graph_mode and (self.is_ge_backend or self.is_ascend):
                y, batch_mean, batch_var, _, _ = \
                    self.bn_train(x,
                                  self.gamma,
                                  self.beta,
                                  None,
                                  None)

                mean_sub = self.sub_mean(self.moving_mean, batch_mean)
                temp_mean = self.mul_mean(mean_sub, self.momentum)
                mean_sub2 = self.sub_var(self.moving_variance, batch_var)
                temp_variance = self.mul_var(mean_sub2, self.momentum)
                y = F.depend(y, self.assign_sub_mean(self.moving_mean, temp_mean))
                y = F.depend(y, self.assign_sub_var(self.moving_variance, temp_variance))
            else:
                y = self.bn_train(x,
                                  self.gamma,
                                  self.beta,
                                  self.moving_mean,
                                  self.moving_variance)[0]
        else:
            y = self.bn_infer(x,
                              self.gamma,
                              self.beta,
                              self.moving_mean,
                              self.moving_variance)[0]
        return y

    def extend_repr(self):
        return 'num_features={}, eps={}, momentum={}, gamma={}, beta={}, moving_mean={}, moving_variance={}'.format(
            self.num_features, self.eps, self.momentum, self.gamma, self.beta, self.moving_mean, self.moving_variance)

@constexpr
def _channel_check(channel, num_channel):
    if channel != num_channel:
        raise ValueError("the input channel is not equal with num_channel")

@constexpr
def _shape_check(in_shape):
    if len(in_shape) != 4:
        raise ValueError("The input must has 4 dims")
@constexpr
def _shape_infer(x_shape, num_feature):
    """global batch normalization shape and axes infer"""
    if len(x_shape) == 4:
        axes = (0, 2, 3)
        re_shape = (1, num_feature, 1, 1)
    else:
        axes = (0,)
        re_shape = (1, num_feature)
    return axes, re_shape

[docs]class BatchNorm1d(_BatchNorm): r""" Batch normalization layer over a 2D input. Batch Normalization is widely used in convolutional networks. This layer applies Batch Normalization over a 2D input (a mini-batch of 1D inputs) to reduce internal covariate shift as described in the paper `Batch Normalization: Accelerating Deep Network Training by Reducing Internal Covariate Shift <https://arxiv.org/abs/1502.03167>`_. It rescales and recenters the feature using a mini-batch of data and the learned parameters which can be described in the following formula. .. math:: y = \frac{x - \mathrm{E}[x]}{\sqrt{\mathrm{Var}[x] + \epsilon}} * \gamma + \beta Args: num_features (int): `C` from an expected input of size (N, C). eps (float): A value added to the denominator for numerical stability. Default: 1e-5. momentum (float): A floating hyperparameter of the momentum for the running_mean and running_var computation. Default: 0.9. affine (bool): A bool value when set to True, gamma and beta can be learnable. Default: True. gamma_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the gamma weight. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'ones'. beta_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the beta weight. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'zeros'. moving_mean_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the moving mean. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'zeros'. moving_var_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the moving variance. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'ones'. use_batch_statistics (bool): If true, use the mean value and variance value of current batch data, else use the mean value and variance value of specified value. Default: True. Inputs: - **input** (Tensor) - Tensor of shape :math:`(N, C_{in}, H_{in}, W_{in})`. Outputs: Tensor, the normalized, scaled, offset tensor, of shape :math:`(N, C_{out}, H_{out}, W_{out})`. Examples: >>> net = nn.BatchNorm1d(num_features=16) >>> input = Tensor(np.random.randint(0, 255, [3, 16]), mindspore.float32) >>> net(input) """ def __init__(self, num_features, eps=1e-5, momentum=0.9, affine=True, gamma_init='ones', beta_init='zeros', moving_mean_init='zeros', moving_var_init='ones', use_batch_statistics=True): super(BatchNorm1d, self).__init__(num_features, eps, momentum, affine, gamma_init, beta_init, moving_mean_init, moving_var_init, use_batch_statistics) def _check_data_dim(self, x): if x.dim() != 2: pass
[docs]class BatchNorm2d(_BatchNorm): r""" Batch normalization layer over a 4D input. Batch Normalization is widely used in convolutional networks. This layer applies Batch Normalization over a 4D input (a mini-batch of 2D inputs with additional channel dimension) to avoid internal covariate shift as described in the paper `Batch Normalization: Accelerating Deep Network Training by Reducing Internal Covariate Shift <https://arxiv.org/abs/1502.03167>`_. It rescales and recenters the feature using a mini-batch of data and the learned parameters which can be described in the following formula. .. math:: y = \frac{x - \mathrm{E}[x]}{\sqrt{\mathrm{Var}[x] + \epsilon}} * \gamma + \beta Args: num_features (int): `C` from an expected input of size (N, C, H, W). eps (float): A value added to the denominator for numerical stability. Default: 1e-5. momentum (float): A floating hyperparameter of the momentum for the running_mean and running_var computation. Default: 0.9. affine (bool): A bool value when set to True, gamma and beta can be learnable. Default: True. gamma_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the gamma weight. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'ones'. beta_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the beta weight. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'zeros'. moving_mean_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the moving mean. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'zeros'. moving_var_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the moving variance. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'ones'. use_batch_statistics (bool): If true, use the mean value and variance value of current batch data, else use the mean value and variance value of specified value. Default: True. Inputs: - **input** (Tensor) - Tensor of shape :math:`(N, C_{in}, H_{in}, W_{in})`. Outputs: Tensor, the normalized, scaled, offset tensor, of shape :math:`(N, C_{out}, H_{out}, W_{out})`. Examples: >>> net = nn.BatchNorm2d(num_features=3) >>> input = Tensor(np.random.randint(0, 255, [1, 3, 224, 224]), mindspore.float32) >>> net(input) """ def __init__(self, num_features, eps=1e-5, momentum=0.9, affine=True, gamma_init='ones', beta_init='zeros', moving_mean_init='zeros', moving_var_init='ones', use_batch_statistics=True): super(BatchNorm2d, self).__init__(num_features, eps, momentum, affine, gamma_init, beta_init, moving_mean_init, moving_var_init, use_batch_statistics) def _check_data_dim(self, x): if x.dim() != 4: pass
[docs]class GlobalBatchNorm(_BatchNorm): r""" Global normalization layer over a N-dimension input. Global Normalization is cross device synchronized batch normalization. Batch Normalization implementation only normalize the data within each device. Global normalization will normalize the input within the group. It has been described in the paper `Batch Normalization: Accelerating Deep Network Training by Reducing Internal Covariate Shift <https://arxiv.org/abs/1502.03167>`_. It rescales and recenters the feature using a mini-batch of data and the learned parameters which can be described in the following formula. .. math:: y = \frac{x - \mathrm{E}[x]}{\sqrt{\mathrm{Var}[x] + \epsilon}} * \gamma + \beta Note: Currently, GlobalBatchNorm only supports 2D and 4D inputs. Args: num_features (int): `C` from an expected input of size (N, C, H, W). device_num_each_group (int): The number of devices in each group. eps (float): A value added to the denominator for numerical stability. Default: 1e-5. momentum (float): A floating hyperparameter of the momentum for the running_mean and running_var computation. Default: 0.9. gamma_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the gamma weight. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'ones'. beta_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the beta weight. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'zeros'. moving_mean_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the moving mean. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'zeros'. moving_var_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the moving variance. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'ones'. use_batch_statistics (bool): If true, use the mean value and variance value of current batch data, else use the mean value and variance value of specified value. Default: True. Inputs: - **input** (Tensor) - Tensor of shape :math:`(N, C_{in}, H_{in}, W_{in})`. Outputs: Tensor, the normalized, scaled, offset tensor, of shape :math:`(N, C_{out}, H_{out}, W_{out})`. Examples: >>> global_bn_op = nn.GlobalBatchNorm(num_features=3, device_num_each_group=4) >>> input = Tensor(np.random.randint(0, 255, [1, 3, 224, 224]), mindspore.float32) >>> global_bn_op(input) """ def __init__(self, num_features, eps=1e-5, momentum=0.9, affine=True, gamma_init='ones', beta_init='zeros', moving_mean_init='zeros', moving_var_init='ones', use_batch_statistics=True, device_num_each_group=1): super(GlobalBatchNorm, self).__init__(num_features, eps, momentum, affine, gamma_init, beta_init, moving_mean_init, moving_var_init, use_batch_statistics, device_num_each_group) self.group = check_int_positive(device_num_each_group) if self.group <= 1: raise ValueError("the number of group must be greater than 1.") def _check_data_dim(self, x): if x.dim == 0: pass
[docs]class LayerNorm(Cell): r""" Applies Layer Normalization over a mini-batch of inputs. Layer normalization is widely used in recurrent neural networks. It applies normalization over a mini-batch of inputs for each single training case as described in the paper `Layer Normalization <https://arxiv.org/pdf/1607.06450.pdf>`_. Unlike batch normalization, layer normalization performs exactly the same computation at training and testing times. It can be described using the following formula. It is applied across all channels and pixel but only one batch size. .. math:: y = \frac{x - \mathrm{E}[x]}{\sqrt{\mathrm{Var}[x] + \epsilon}} * \gamma + \beta Args: normalized_shape (Union(tuple[int], list[int]): The normalization is performed over axis `begin_norm_axis ... R - 1`. begin_norm_axis (int): It first normalization dimension: normalization will be performed along dimensions `begin_norm_axis: rank(inputs)`, the value should be in [-1, rank(input)). Default: -1. begin_params_axis (int): The first parameter(beta, gamma)dimension: scale and centering parameters will have dimensions `begin_params_axis: rank(inputs)` and will be broadcast with the normalized inputs accordingly, the value should be in [-1, rank(input)). Default: -1. gamma_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the gamma weight. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'ones'. beta_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the beta weight. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'zeros'. epsilon (float): A value added to the denominator for numerical stability. Default: 1e-7. Inputs: - **input_x** (Tensor) - The shape of 'input_x' is :math:`(x_1, x_2, ..., x_R)`, and `input_shape[begin_norm_axis:]` is equal to `normalized_shape`. Outputs: Tensor, the normalized and scaled offset tensor, has the same shape and data type as the `input_x`. Examples: >>> x = Tensor(np.ones([20, 5, 10, 10]), mindspore.float32) >>> shape1 = x.shape()[1:] >>> m = nn.LayerNorm(shape1, begin_norm_axis=1, begin_params_axis=1) >>> m(x) """ def __init__(self, normalized_shape, begin_norm_axis=-1, begin_params_axis=-1, gamma_init='ones', beta_init='zeros', epsilon=1e-7 ): super(LayerNorm, self).__init__() if not isinstance(normalized_shape, (tuple, list)): raise TypeError("The type of 'normalized_shape' should be tuple[int] or list[int], but '{}' type is {}." .format(normalized_shape, type(normalized_shape))) self.normalized_shape = normalized_shape self.begin_norm_axis = begin_norm_axis self.begin_params_axis = begin_params_axis self.epsilon = epsilon self.gamma = Parameter(initializer( gamma_init, normalized_shape), name="gamma") self.beta = Parameter(initializer( beta_init, normalized_shape), name="beta") self.layer_norm = P.LayerNorm(begin_norm_axis=self.begin_norm_axis, begin_params_axis=self.begin_params_axis, epsilon=self.epsilon) def construct(self, input_x): y, _, _ = self.layer_norm(input_x, self.gamma, self.beta) return y
[docs] def extend_repr(self): """Display instance object as string.""" s = 'normalized_shape={}, begin_norm_axis={}, begin_params_axis={}, gamma{}, beta={}'.format( self.normalized_shape, self.begin_norm_axis, self.begin_params_axis, self.gamma, self.beta) return s
[docs]class GroupNorm(Cell): r""" Group Normalization over a mini-batch of inputs. Group normalization is widely used in recurrent neural networks. It applies normalization over a mini-batch of inputs for each single training case as described in the paper `Group Normalization <https://arxiv.org/pdf/1803.08494.pdf>`_. Group normalization divides the channels into groups and computes within each group the mean and variance for normalization, and it performs very stable over a wide range of batch size. It can be described using the following formula. .. math:: y = \frac{x - \mathrm{E}[x]}{\sqrt{\mathrm{Var}[x] + \epsilon}} * \gamma + \beta Args: num_groups (int): The number of groups to be divided along the channel dimension. num_channels (int): The number of channels per group. eps (float): A value added to the denominator for numerical stability. Default: 1e-5. affine (bool): A bool value, this layer will has learnable affine parameters when set to true. Default: True. gamma_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the gamma weight. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'ones'. beta_init (Union[Tensor, str, Initializer, numbers.Number]): Initializer for the beta weight. The values of str refer to the function `initializer` including 'zeros', 'ones', 'xavier_uniform', 'he_uniform', etc. Default: 'zeros'. Inputs: - **input_x** (Tensor) - The input feature with shape [N, C, H, W]. Outputs: Tensor, the normalized and scaled offset tensor, has the same shape and data type as the `input_x`. Examples: >>> goup_norm_op = nn.GroupNorm(16, 64) >>> x = Tensor(np.ones([1, 64, 256, 256], np.float32)) >>> goup_norm_op(x) """ def __init__(self, num_groups, num_channels, eps=1e-05, affine=True, gamma_init='ones', beta_init='zeros'): super(GroupNorm, self).__init__() self.num_groups = check_int_positive(num_groups) self.num_channels = check_int_positive(num_channels) if num_channels % num_groups != 0: raise ValueError("num_channels should be divided by num_groups") self.eps = check_typename('eps', eps, (float,)) self.affine = check_bool(affine) gamma = initializer(gamma_init, [num_channels, 1, 1]) beta = initializer(beta_init, [num_channels, 1, 1]) if self.affine: self.gamma = Parameter(gamma, name='gamma') self.beta = Parameter(beta, name='beta') else: self.gamma = gamma self.beta = beta self.shape = F.shape self.reshape = F.reshape self.reduce_mean = P.ReduceMean(keep_dims=True) self.square = F.square self.reduce_sum = P.ReduceSum(keep_dims=True) self.sqrt = P.Sqrt() def _cal_output(self, x): """calculate groupnorm output""" batch, channel, height, width = self.shape(x) _channel_check(channel, self.num_channels) x = self.reshape(x, (batch, self.num_groups, channel*height*width/self.num_groups)) mean = self.reduce_mean(x, 2) var = self.reduce_sum(self.square(x - mean), 2) / (channel * height * width / self.num_groups - 1) std = self.sqrt(var + self.eps) x = (x - mean) / std x = self.reshape(x, (batch, channel, height, width)) output = x * self.gamma + self.beta return output def construct(self, x): _shape_check(self.shape(x)) output = self._cal_output(x) return output
[docs] def extend_repr(self): """Display instance object as string.""" s = 'num_groups={}, num_channels={}'.format(self.num_groups, self.num_channels) return s