Source code for mindspore.nn.probability.distribution.bernoulli

# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Bernoulli Distribution"""
from mindspore.common import dtype as mstype
from mindspore.ops import operations as P
from mindspore.ops import composite as C
from .distribution import Distribution
from ._utils.utils import cast_to_tensor, check_prob, check_type, check_distribution_name, raise_none_error
from ._utils.custom_ops import exp_generic, log_generic, erf_generic


[docs]class Bernoulli(Distribution): """ Bernoulli Distribution. Args: probs (float, list, numpy.ndarray, Tensor, Parameter): probability of 1 as outcome. seed (int): seed to use in sampling. Default: 0. dtype (mindspore.dtype): type of the distribution. Default: mstype.int32. name (str): name of the distribution. Default: Bernoulli. Note: probs should be proper probabilities (0 < p < 1). Dist_spec_args is probs. Examples: >>> # To initialize a Bernoulli distribution of prob 0.5 >>> import mindspore.nn.probability.distribution as msd >>> b = msd.Bernoulli(0.5, dtype=mstype.int32) >>> >>> # The following creates two independent Bernoulli distributions >>> b = msd.Bernoulli([0.5, 0.5], dtype=mstype.int32) >>> >>> # A Bernoulli distribution can be initilized without arguments >>> # In this case, probs must be passed in through args during function calls. >>> b = msd.Bernoulli(dtype=mstype.int32) >>> >>> # To use Bernoulli in a network >>> class net(Cell): >>> def __init__(self): >>> super(net, self).__init__(): >>> self.b1 = msd.Bernoulli(0.5, dtype=mstype.int32) >>> self.b2 = msd.Bernoulli(dtype=mstype.int32) >>> >>> # All the following calls in construct are valid >>> def construct(self, value, probs_b, probs_a): >>> >>> # Similar calls can be made to other probability functions >>> # by replacing 'prob' with the name of the function >>> ans = self.b1.prob(value) >>> # Evaluate with the respect to distribution b >>> ans = self.b1.prob(value, probs_b) >>> >>> # probs must be passed in during function calls >>> ans = self.b2.prob(value, probs_a) >>> >>> # Functions 'sd', 'var', 'entropy' have the same usage as 'mean' >>> # Will return 0.5 >>> ans = self.b1.mean() >>> # Will return probs_b >>> ans = self.b1.mean(probs_b) >>> >>> # probs must be passed in during function calls >>> ans = self.b2.mean(probs_a) >>> >>> # Usage of 'kl_loss' and 'cross_entropy' are similar >>> ans = self.b1.kl_loss('Bernoulli', probs_b) >>> ans = self.b1.kl_loss('Bernoulli', probs_b, probs_a) >>> >>> # Additional probs_a must be passed in through >>> ans = self.b2.kl_loss('Bernoulli', probs_b, probs_a) >>> >>> # Sample >>> ans = self.b1.sample() >>> ans = self.b1.sample((2,3)) >>> ans = self.b1.sample((2,3), probs_b) >>> ans = self.b2.sample((2,3), probs_a) """ def __init__(self, probs=None, seed=0, dtype=mstype.int32, name="Bernoulli"): """ Constructor of Bernoulli distribution. """ param = dict(locals()) valid_dtype = mstype.int_type + mstype.uint_type + mstype.float_type check_type(dtype, valid_dtype, type(self).__name__) super(Bernoulli, self).__init__(seed, dtype, name, param) self.parameter_type = mstype.float32 if probs is not None: self._probs = cast_to_tensor(probs, mstype.float32) check_prob(self.probs) else: self._probs = probs # ops needed for the class self.exp = exp_generic self.log = log_generic self.erf = erf_generic self.squeeze = P.Squeeze(0) self.cast = P.Cast() self.const = P.ScalarToArray() self.dtypeop = P.DType() self.floor = P.Floor() self.fill = P.Fill() self.less = P.Less() self.shape = P.Shape() self.select = P.Select() self.sq = P.Square() self.sqrt = P.Sqrt() self.uniform = C.uniform def extend_repr(self): if self.is_scalar_batch: str_info = f'probs = {self.probs}' else: str_info = f'batch_shape = {self._broadcast_shape}' return str_info @property def probs(self): """ Returns the probability for the outcome is 1. """ return self._probs def _check_param(self, probs1): """ Check availablity of distribution specific args probs1. """ if probs1 is not None: if self.context_mode == 0: self.checktensor(probs1, 'probs1') else: probs1 = self.checktensor(probs1, 'probs1') return self.cast(probs1, self.parameter_type) return self.probs if self.probs is not None else raise_none_error('probs1') def _mean(self, probs1=None): r""" .. math:: MEAN(B) = probs1 """ probs1 = self._check_param(probs1) return probs1 def _mode(self, probs1=None): r""" .. math:: MODE(B) = 1 if probs1 > 0.5 else = 0 """ probs1 = self._check_param(probs1) prob_type = self.dtypeop(probs1) zeros = self.fill(prob_type, self.shape(probs1), 0.0) ones = self.fill(prob_type, self.shape(probs1), 1.0) comp = self.less(0.5, probs1) return self.select(comp, ones, zeros) def _var(self, probs1=None): r""" .. math:: VAR(B) = probs1 * probs0 """ probs1 = self._check_param(probs1) probs0 = 1.0 - probs1 return self.exp(self.log(probs0) + self.log(probs1)) def _entropy(self, probs1=None): r""" .. math:: H(B) = -probs0 * \log(probs0) - probs1 * \log(probs1) """ probs1 = self._check_param(probs1) probs0 = 1 - probs1 return -1 * (probs0 * self.log(probs0)) - (probs1 * self.log(probs1)) def _cross_entropy(self, dist, probs1_b, probs1=None): """ Evaluate cross_entropy between Bernoulli distributions. Args: dist (str): type of the distributions. Should be "Bernoulli" in this case. probs1_b (Tensor): probs1 of distribution b. probs1_a (Tensor): probs1 of distribution a. Default: self.probs. """ check_distribution_name(dist, 'Bernoulli') return self._entropy(probs1) + self._kl_loss(dist, probs1_b, probs1) def _log_prob(self, value, probs1=None): r""" pmf of Bernoulli distribution. Args: value (Tensor): a Tensor composed of only zeros and ones. probs (Tensor): probability of outcome is 1. Default: self.probs. .. math:: pmf(k) = probs1 if k = 1; pmf(k) = probs0 if k = 0; """ value = self._check_value(value, 'value') value = self.cast(value, mstype.float32) probs1 = self._check_param(probs1) probs0 = 1.0 - probs1 return self.log(probs1) * value + self.log(probs0) * (1.0 - value) def _cdf(self, value, probs1=None): r""" cdf of Bernoulli distribution. Args: value (Tensor): value to be evaluated. probs (Tensor): probability of outcome is 1. Default: self.probs. .. math:: cdf(k) = 0 if k < 0; cdf(k) = probs0 if 0 <= k <1; cdf(k) = 1 if k >=1; """ value = self._check_value(value, 'value') value = self.cast(value, mstype.float32) value = self.floor(value) probs1 = self._check_param(probs1) prob_type = self.dtypeop(probs1) value = value * self.fill(prob_type, self.shape(probs1), 1.0) probs0 = 1.0 - probs1 * self.fill(prob_type, self.shape(value), 1.0) comp_zero = self.less(value, 0.0) comp_one = self.less(value, 1.0) zeros = self.fill(prob_type, self.shape(value), 0.0) ones = self.fill(prob_type, self.shape(value), 1.0) less_than_zero = self.select(comp_zero, zeros, probs0) return self.select(comp_one, less_than_zero, ones) def _kl_loss(self, dist, probs1_b, probs1=None): r""" Evaluate bernoulli-bernoulli kl divergence, i.e. KL(a||b). Args: dist (str): type of the distributions. Should be "Bernoulli" in this case. probs1_b (Tensor, Number): probs1 of distribution b. probs1_a (Tensor, Number): probs1 of distribution a. Default: self.probs. .. math:: KL(a||b) = probs1_a * \log(\frac{probs1_a}{probs1_b}) + probs0_a * \log(\frac{probs0_a}{probs0_b}) """ check_distribution_name(dist, 'Bernoulli') probs1_b = self._check_value(probs1_b, 'probs1_b') probs1_b = self.cast(probs1_b, self.parameter_type) probs1_a = self._check_param(probs1) probs0_a = 1.0 - probs1_a probs0_b = 1.0 - probs1_b return probs1_a * self.log(probs1_a / probs1_b) + probs0_a * self.log(probs0_a / probs0_b) def _sample(self, shape=(), probs1=None): """ Sampling. Args: shape (tuple): shape of the sample. Default: (). probs (Tensor, Number): probs1 of the samples. Default: self.probs. Returns: Tensor, shape is shape + batch_shape. """ shape = self.checktuple(shape, 'shape') probs1 = self._check_param(probs1) origin_shape = shape + self.shape(probs1) if origin_shape == (): sample_shape = (1,) else: sample_shape = origin_shape l_zero = self.const(0.0) h_one = self.const(1.0) sample_uniform = self.uniform(sample_shape, l_zero, h_one, self.seed) sample = self.less(sample_uniform, probs1) value = self.cast(sample, self.dtype) if origin_shape == (): value = self.squeeze(value) return value