# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Natural-evolutionary-strategy Attack.
"""
import time
import numpy as np
from scipy.special import softmax
from mindarmour.attacks.attack import Attack
from mindarmour.utils.logger import LogUtil
from mindarmour.attacks.black.black_model import BlackModel
from mindarmour.utils._check_param import check_pair_numpy_param, check_model, \
    check_numpy_param, check_int_positive, check_value_positive, check_param_type
LOGGER = LogUtil.get_instance()
TAG = 'NES'
def _one_hot(index, total):
    arr = np.zeros((total))
    arr[index] = 1.0
    return arr
def _bound(image, epislon):
    lower = np.clip(image - epislon, 0, 1)
    upper = np.clip(image + epislon, 0, 1)
    return lower, upper
[docs]class NES(Attack):
    """
    The class is an implementation of the Natural Evolutionary Strategies Attack,
    including three settings: Query-Limited setting, Partial-Information setting
    and Label-Only setting.
    References: `Andrew Ilyas, Logan Engstrom, Anish Athalye, and Jessy Lin.
    Black-box adversarial attacks with limited queries and information. In
    ICML, July 2018 <https://arxiv.org/abs/1804.08598>`_
    Args:
        model (BlackModel): Target model.
        scene (str): Scene in 'Label_Only', 'Partial_Info' or
            'Query_Limit'.
        max_queries (int): Maximum query numbers to generate an adversarial
            example. Default: 500000.
        top_k (int): For Partial-Info or Label-Only setting, indicating how
            much (Top-k) information is available for the attacker. For
            Query-Limited setting, this input should be set as -1. Default: -1.
        num_class (int): Number of classes in dataset. Default: 10.
        batch_size (int): Batch size. Default: 96.
        epsilon (float): Maximum perturbation allowed in attack. Default: 0.3.
        samples_per_draw (int): Number of samples draw in antithetic sampling.
            Default: 96.
        momentum (float): Momentum. Default: 0.9.
        learning_rate (float): Learning rate. Default: 1e-2.
        max_lr (float): Max Learning rate. Default: 1e-2.
        min_lr (float): Min Learning rate. Default: 5e-5.
        sigma (float): Step size of random noise. Default: 1e-3.
        plateau_length (int): Length of plateau used in Annealing algorithm.
            Default: 20.
        plateau_drop (float): Drop of plateau used in Annealing algorithm.
            Default: 2.0.
        adv_thresh (float): Threshold of adversarial. Default: 0.15.
        zero_iters (int): Number of points to use for the proxy score.
            Default: 10.
        starting_eps (float): Starting epsilon used in Label-Only setting.
            Default: 1.0.
        starting_delta_eps (float): Delta epsilon used in Label-Only setting.
            Default: 0.5.
        label_only_sigma (float): Sigma used in Label-Only setting.
            Default: 1e-3.
        conservative (int): Conservation used in epsilon decay, it will
            increase if no convergence. Default: 2.
        sparse (bool): If True, input labels are sparse-encoded. If False,
            input labels are one-hot-encoded. Default: True.
    Examples:
        >>> SCENE = 'Label_Only'
        >>> TOP_K = 5
        >>> num_class = 5
        >>> nes_instance = NES(user_model, SCENE, top_k=TOP_K)
        >>> initial_img = np.asarray(np.random.random((32, 32)), np.float32)
        >>> target_image  = np.asarray(np.random.random((32, 32)), np.float32)
        >>> orig_class = 0
        >>> target_class = 2
        >>> nes_instance.set_target_images(target_image)
        >>> tag, adv, queries = nes_instance.generate([initial_img], [target_class])
    """
    def __init__(self, model, scene, max_queries=10000, top_k=-1, num_class=10,
                 batch_size=128, epsilon=0.3, samples_per_draw=128,
                 momentum=0.9, learning_rate=1e-3, max_lr=5e-2, min_lr=5e-4,
                 sigma=1e-3, plateau_length=20, plateau_drop=2.0,
                 adv_thresh=0.25, zero_iters=10, starting_eps=1.0,
                 starting_delta_eps=0.5, label_only_sigma=1e-3, conservative=2,
                 sparse=True):
        super(NES, self).__init__()
        self._model = check_model('model', model, BlackModel)
        self._scene = scene
        self._max_queries = check_int_positive('max_queries', max_queries)
        self._num_class = check_int_positive('num_class', num_class)
        self._batch_size = check_int_positive('batch_size', batch_size)
        self._samples_per_draw = check_int_positive('samples_per_draw',
                                                    samples_per_draw)
        self._goal_epsilon = check_value_positive('epsilon', epsilon)
        self._momentum = check_value_positive('momentum', momentum)
        self._learning_rate = check_value_positive('learning_rate',
                                                   learning_rate)
        self._max_lr = check_value_positive('max_lr', max_lr)
        self._min_lr = check_value_positive('min_lr', min_lr)
        self._sigma = check_value_positive('sigma', sigma)
        self._plateau_length = check_int_positive('plateau_length',
                                                  plateau_length)
        self._plateau_drop = check_value_positive('plateau_drop', plateau_drop)
        # partial information arguments
        self._k = top_k
        self._adv_thresh = check_value_positive('adv_thresh', adv_thresh)
        # label only arguments
        self._zero_iters = check_int_positive('zero_iters', zero_iters)
        self._starting_eps = check_value_positive('starting_eps', starting_eps)
        self._starting_delta_eps = check_value_positive('starting_delta_eps',
                                                        starting_delta_eps)
        self._label_only_sigma = check_value_positive('label_only_sigma',
                                                      label_only_sigma)
        self._conservative = check_int_positive('conservative', conservative)
        self._sparse = check_param_type('sparse', sparse, bool)
        self.target_imgs = None
        self.target_img = None
        self.target_class = None
[docs]    def generate(self, inputs, labels):
        """
        Main algorithm for NES.
        Args:
            inputs (numpy.ndarray): Benign input samples.
            labels (numpy.ndarray): Target labels.
        Returns:
            - numpy.ndarray, bool values for each attack result.
            - numpy.ndarray, generated adversarial examples.
            - numpy.ndarray, query times for each sample.
        Raises:
            ValueError: If the top_k less than 0 in Label-Only or Partial-Info
                setting.
            ValueError: If the target_imgs is None in Label-Only or
                Partial-Info setting.
            ValueError: If scene is not in ['Label_Only', 'Partial_Info',
                'Query_Limit']
        Examples:
            >>> advs = attack.generate([[0.2, 0.3, 0.4], [0.3, 0.3, 0.2]],
            >>> [1, 2])
        """
        inputs, labels = check_pair_numpy_param('inputs', inputs,
                                                'labels', labels)
        if not self._sparse:
            labels = np.argmax(labels, axis=1)
        if self._scene == 'Label_Only' or self._scene == 'Partial_Info':
            if self._k < 0:
                msg = "In 'Label_Only' or 'Partial_Info' mode, " \
                      
"'top_k' must more than 0."
                LOGGER.error(TAG, msg)
                raise ValueError(msg)
            if self.target_imgs is None:
                msg = "In 'Label_Only' or 'Partial_Info' mode, " \
                      
"'target_imgs' must be set."
                LOGGER.error(TAG, msg)
                raise ValueError(msg)
        elif self._scene == 'Query_Limit':
            self._k = self._num_class
        else:
            msg = "scene must be string in 'Label_Only', " \
                  
"'Partial_Info' or 'Query_Limit' "
            LOGGER.error(TAG, msg)
            raise ValueError(msg)
        is_advs = []
        advs = []
        queries = []
        for sample, label, target_img in zip(inputs, labels, self.target_imgs):
            is_adv, adv, query = self._generate_one(sample, label, target_img)
            is_advs.append(is_adv)
            advs.append(adv)
            queries.append(query)
        return is_advs, advs, queries 
[docs]    def set_target_images(self, target_images):
        """
        Set target samples for target attack.
        Args:
            target_images (numpy.ndarray): Target samples for target attack.
        """
        self.target_imgs = check_numpy_param('target_images', target_images) 
    def _generate_one(self, origin_image, target_label, target_image):
        """
        Main algorithm for NES.
        Args:
            origin_image (numpy.ndarray): Benign input sample.
            target_label (int): Target label.
        Returns:
            - bool.
                - If True: successfully make an adversarial example.
                - If False: unsuccessfully make an adversarial example.
            - numpy.ndarray, an adversarial example.
            - int, number of queries.
        """
        self.target_class = target_label
        origin_image = check_numpy_param('origin_image', origin_image)
        self._epsilon = self._starting_eps
        lower, upper = _bound(origin_image, self._epsilon)
        goal_epsilon = self._goal_epsilon
        delta_epsilon = self._starting_delta_eps
        if self._scene == 'Label_Only' or self._scene == 'Partial_Info':
            adv = target_image
        else:
            adv = origin_image.copy()
        # for backtracking and momentum
        num_queries = 0
        gradient = 0
        last_ls = []
        max_iters = int(np.ceil(self._max_queries // self._samples_per_draw))
        for i in range(max_iters):
            start = time.time()
            # early stop
            eval_preds = self._model.predict(adv)
            eval_preds = np.argmax(eval_preds, axis=1)
            padv = np.equal(eval_preds, self.target_class)
            if padv and self._epsilon <= goal_epsilon:
                LOGGER.debug(TAG, 'early stopping at iteration %d', i)
                return True, adv, num_queries
            #  antithetic sampling noise
            noise_pos = np.random.normal(
                size=(self._batch_size // 2,) + origin_image.shape)
            noise = np.concatenate((noise_pos, -noise_pos), axis=0)
            eval_points = adv + self._sigma*noise
            prev_g = gradient
            loss, gradient = self._get_grad(origin_image, eval_points, noise)
            gradient = self._momentum*prev_g + (1.0 - self._momentum)*gradient
            # plateau learning rate annealing
            last_ls.append(loss)
            last_ls = self._plateau_annealing(last_ls)
            # search for learning rate and epsilon decay
            current_lr = self._max_lr
            prop_delta_eps = 0.0
            if loss < self._adv_thresh and self._epsilon > goal_epsilon:
                prop_delta_eps = delta_epsilon
            while current_lr >= self._min_lr:
                # in partial information only or label only setting
                if self._scene == 'Label_Only' or self._scene == 'Partial_Info':
                    proposed_epsilon = max(self._epsilon - prop_delta_eps,
                                           goal_epsilon)
                    lower, upper = _bound(origin_image, proposed_epsilon)
                proposed_adv = adv - current_lr*np.sign(gradient)
                proposed_adv = np.clip(proposed_adv, lower, upper)
                num_queries += 1
                if self._preds_in_top_k(self.target_class, proposed_adv):
                    # The predicted label of proposed adversarial examples is in
                    # the top k observations.
                    if prop_delta_eps > 0:
                        delta_epsilon = max(prop_delta_eps, 0.1)
                        last_ls = []
                    adv = proposed_adv
                    self._epsilon = max(
                        self._epsilon - prop_delta_eps / self._conservative,
                        goal_epsilon)
                    break
                elif current_lr >= self._min_lr*2:
                    current_lr = current_lr / 2
                    LOGGER.debug(TAG, "backtracking learning rate to %.3f",
                                 current_lr)
                else:
                    prop_delta_eps = prop_delta_eps / 2
                    if prop_delta_eps < 2e-3:
                        LOGGER.debug(TAG, "Did not converge.")
                        return False, adv, num_queries
                    current_lr = self._max_lr
                    LOGGER.debug(TAG,
                                 "backtracking epsilon to %.3f",
                                 self._epsilon - prop_delta_eps)
            # update the number of queries
            if self._scene == 'Label_Only':
                num_queries += self._samples_per_draw*self._zero_iters
            else:
                num_queries += self._samples_per_draw
            LOGGER.debug(TAG,
                         'Step %d: loss %.4f, lr %.2E, eps %.3f, time %.4f.',
                         i,
                         loss,
                         current_lr,
                         self._epsilon,
                         time.time() - start)
        return False, adv, num_queries
    def _plateau_annealing(self, last_loss):
        last_loss = last_loss[-self._plateau_length:]
        if last_loss[-1] > last_loss[0] and len(
                last_loss) == self._plateau_length:
            if self._max_lr > self._min_lr:
                LOGGER.debug(TAG, "Annealing max learning rate.")
                self._max_lr = max(self._max_lr / self._plateau_drop,
                                   self._min_lr)
            last_loss = []
        return last_loss
    def _softmax_cross_entropy_with_logit(self, logit):
        logit = softmax(logit, axis=1)
        onehot_label = np.zeros(self._num_class)
        onehot_label[self.target_class] = 1
        onehot_labels = np.tile(onehot_label, (len(logit), 1))
        entropy = -onehot_labels*np.log(logit)
        loss = np.mean(entropy, axis=1)
        return loss
    def _query_limit_loss(self, eval_points, noise):
        """
        Loss in Query-Limit setting.
        """
        LOGGER.debug(TAG, 'enter the function _query_limit_loss().')
        loss = self._softmax_cross_entropy_with_logit(
            self._model.predict(eval_points))
        return loss, noise
    def _partial_info_loss(self, eval_points, noise):
        """
        Loss in Partial-Info setting.
        """
        LOGGER.debug(TAG, 'enter the function _partial_info_loss.')
        logit = self._model.predict(eval_points)
        loss = np.sort(softmax(logit, axis=1))[:, -self._k:]
        inds = np.argsort(logit)[:, -self._k:]
        good_loss = np.where(np.equal(inds, self.target_class), loss,
                             np.zeros(np.shape(inds)))
        good_loss = np.max(good_loss, axis=1)
        losses = -np.log(good_loss)
        return losses, noise
    def _label_only_loss(self, origin_image, eval_points, noise):
        """
        Loss in Label-Only setting.
        """
        LOGGER.debug(TAG, 'enter the function _label_only_loss().')
        tiled_points = np.tile(np.expand_dims(eval_points, 0),
                               [self._zero_iters,
                                *[1]*len(eval_points.shape)])
        noised_eval_im = tiled_points \
                         
+ np.random.randn(self._zero_iters,
                                           self._batch_size,
                                           *origin_image.shape) \
                         
*self._label_only_sigma
        noised_eval_im = np.reshape(noised_eval_im, (
            self._zero_iters*self._batch_size, *origin_image.shape))
        logits = self._model.predict(noised_eval_im)
        inds = np.argsort(logits)[:, -self._k:]
        real_inds = np.reshape(inds, (self._zero_iters, self._batch_size, -1))
        rank_range = np.arange(1, self._k + 1, 1, dtype=np.float32)
        tiled_rank_range = np.tile(np.reshape(rank_range, (1, 1, self._k)),
                                   [self._zero_iters, self._batch_size, 1])
        batches_in = np.where(np.equal(real_inds, self.target_class),
                              tiled_rank_range,
                              np.zeros(np.shape(tiled_rank_range)))
        loss = 1 - np.mean(batches_in)
        return loss, noise
    def _preds_in_top_k(self, target_class, prop_adv_):
        # query limit setting
        if self._k == self._num_class:
            return True
        # label only and partial information setting
        eval_preds = self._model.predict(prop_adv_)
        if not target_class in eval_preds.argsort()[:, -self._k:]:
            return False
        return True
    def _get_grad(self, origin_image, eval_points, noise):
        """Calculate gradient."""
        losses = []
        grads = []
        for _ in range(self._samples_per_draw // self._batch_size):
            if self._scene == 'Label_Only':
                loss, np_noise = self._label_only_loss(origin_image,
                                                       eval_points,
                                                       noise)
            elif self._scene == 'Partial_Info':
                loss, np_noise = self._partial_info_loss(eval_points, noise)
            else:
                loss, np_noise = self._query_limit_loss(eval_points, noise)
            # only support three channel images
            losses_tiled = np.tile(np.reshape(loss, (-1, 1, 1, 1)),
                                   (1,) + origin_image.shape)
            grad = np.mean(losses_tiled*np_noise, axis=0) / self._sigma
            grads.append(grad)
            losses.append(np.mean(loss))
        return np.array(losses).mean(), np.mean(np.array(grads), axis=0)