Source code for mindarmour.diff_privacy.optimizer.optimizer

# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Differential privacy optimizer.
"""
from mindspore import nn
from mindspore import Tensor
from mindspore.ops import composite as C
from mindspore.ops import operations as P
from mindspore.ops import functional as F
from mindspore.common import dtype as mstype

from mindarmour.utils.logger import LogUtil
from mindarmour.diff_privacy import NoiseMechanismsFactory
from mindarmour.diff_privacy.mechanisms.mechanisms import _MechanismsParamsUpdater
from mindarmour.utils._check_param import check_int_positive

LOGGER = LogUtil.get_instance()
TAG = 'DP optimizer'

_grad_scale = C.MultitypeFuncGraph("grad_scale")
_reciprocal = P.Reciprocal()


@_grad_scale.register("Tensor", "Tensor")
def tensor_grad_scale(scale, grad):
    """ grad scaling """
    return grad * _reciprocal(scale)


class _TupleAdd(nn.Cell):
    def __init__(self):
        super(_TupleAdd, self).__init__()
        self.add = P.TensorAdd()
        self.hyper_map = C.HyperMap()

    def construct(self, input1, input2):
        """Add two tuple of data."""
        out = self.hyper_map(self.add, input1, input2)
        return out


[docs]class DPOptimizerClassFactory: """ Factory class of Optimizer. Args: micro_batches (int): The number of small batches split from an original batch. Default: 2. Returns: Optimizer, Optimizer class Examples: >>> GaussianSGD = DPOptimizerClassFactory(micro_batches=2) >>> GaussianSGD.set_mechanisms('Gaussian', norm_bound=1.0, initial_noise_multiplier=1.5) >>> net_opt = GaussianSGD.create('Momentum')(params=network.trainable_params(), >>> learning_rate=cfg.lr, >>> momentum=cfg.momentum) """ def __init__(self, micro_batches=2): self._mech_factory = NoiseMechanismsFactory() self.mech = None self._micro_batches = check_int_positive('micro_batches', micro_batches)
[docs] def set_mechanisms(self, policy, *args, **kwargs): """ Get noise mechanism object. Args: policy (str): Choose mechanism type. """ self.mech = self._mech_factory.create(policy, *args, **kwargs)
[docs] def create(self, policy, *args, **kwargs): """ Create DP optimizer. Args: policy (str): Choose original optimizer type. Returns: Optimizer, A optimizer with DP. """ if policy == 'SGD': cls = self._get_dp_optimizer_class(nn.SGD, self.mech, self._micro_batches, *args, **kwargs) return cls if policy == 'Momentum': cls = self._get_dp_optimizer_class(nn.Momentum, self.mech, self._micro_batches, *args, **kwargs) return cls if policy == 'Adam': cls = self._get_dp_optimizer_class(nn.Adam, self.mech, self._micro_batches, *args, **kwargs) return cls msg = "The {} is not implement, please choose ['SGD', 'Momentum', 'Adam']".format(policy) LOGGER.error(TAG, msg) raise NameError(msg)
def _get_dp_optimizer_class(self, cls, mech, micro_batches): """ Wrap original mindspore optimizer with `self._mech`. """ class DPOptimizer(cls): """ Initialize the DPOptimizerClass. Returns: Optimizer, Optimizer class. """ def __init__(self, *args, **kwargs): super(DPOptimizer, self).__init__(*args, **kwargs) self._mech = mech self._tuple_add = _TupleAdd() self._hyper_map = C.HyperMap() self._micro_float = Tensor(micro_batches, mstype.float32) self._mech_param_updater = None if self._mech is not None and self._mech._decay_policy is not None: self._mech_param_updater = _MechanismsParamsUpdater(decay_policy=self._mech._decay_policy, decay_rate=self._mech._noise_decay_rate, cur_noise_multiplier= self._mech._noise_multiplier, init_noise_multiplier= self._mech._initial_noise_multiplier) def construct(self, gradients): """ construct a compute flow. """ grad_noise = self._hyper_map(self._mech, gradients) grads = self._tuple_add(gradients, grad_noise) grads = self._hyper_map(F.partial(_grad_scale, self._micro_float), grads) # update mech parameters if self._mech_param_updater is not None: multiplier = self._mech_param_updater() grads = F.depend(grads, multiplier) gradients = super(DPOptimizer, self).construct(grads) return gradients return DPOptimizer