Source code for mindarmour.detectors.region_based_detector

# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Region-Based detector
"""
import time

import numpy as np

from mindspore import Model
from mindspore import Tensor

from mindarmour.detectors.detector import Detector
from mindarmour.utils.logger import LogUtil
from mindarmour.utils._check_param import check_numpy_param, check_param_type, \
    check_pair_numpy_param, check_model, check_int_positive, \
    check_value_positive, check_value_non_negative, check_param_in_range, \
    check_equal_shape

LOGGER = LogUtil.get_instance()
TAG = 'RegionBasedDetector'


[docs]class RegionBasedDetector(Detector): """ This class implement a region-based detector. Reference: `Mitigating evasion attacks to deep neural networks via region-based classification <https://arxiv.org/abs/1709.05583>`_ Args: model (Model): Target model. number_points (int): The number of samples generate from the hyper cube of original sample. Default: 10. initial_radius (float): Initial radius of hyper cube. Default: 0.0. max_radius (float): Maximum radius of hyper cube. Default: 1.0. search_step (float): Incremental during search of radius. Default: 0.01. degrade_limit (float): Acceptable decrease of classification accuracy. Default: 0.0. sparse (bool): If True, input labels are sparse-encoded. If False, input labels are one-hot-encoded. Default: False. Examples: >>> detector = RegionBasedDetector(model) >>> detector.fit(Tensor(ori), Tensor(labels)) >>> adv_ids = detector.detect(Tensor(adv)) """ def __init__(self, model, number_points=10, initial_radius=0.0, max_radius=1.0, search_step=0.01, degrade_limit=0.0, sparse=False): super(RegionBasedDetector, self).__init__() self._model = check_model('targeted model', model, Model) self._number_points = check_int_positive('number_points', number_points) self._initial_radius = check_value_non_negative('initial_radius', initial_radius) self._max_radius = check_value_positive('max_radius', max_radius) self._search_step = check_value_positive('search_step', search_step) self._degrade_limit = check_value_non_negative('degrade_limit', degrade_limit) self._sparse = check_param_type('sparse', sparse, bool) self._radius = None
[docs] def set_radius(self, radius): """Set radius.""" self._radius = check_param_in_range('radius', radius, self._initial_radius, self._max_radius)
[docs] def fit(self, inputs, labels=None): """ Train detector to decide the best radius. Args: inputs (numpy.ndarray): Benign samples. labels (numpy.ndarray): Ground truth labels of the input samples. Default:None. Returns: float, the best radius. """ inputs, labels = check_pair_numpy_param('inputs', inputs, 'labels', labels) LOGGER.debug(TAG, 'enter fit() function.') time_start = time.time() search_iters = (self._max_radius - self._initial_radius) / self._search_step search_iters = np.round(search_iters).astype(int) radius = self._initial_radius pred = self._model.predict(Tensor(inputs)) raw_preds = np.argmax(pred.asnumpy(), axis=1) if not self._sparse: labels = np.argmax(labels, axis=1) raw_preds, labels = check_equal_shape('raw_preds', raw_preds, 'labels', labels) raw_acc = np.sum(raw_preds == labels) / inputs.shape[0] for _ in range(search_iters): rc_preds = self._rc_forward(inputs, radius) rc_preds, labels = check_equal_shape('rc_preds', rc_preds, 'labels', labels) def_acc = np.sum(rc_preds == labels) / inputs.shape[0] if def_acc >= raw_acc - self._degrade_limit: radius += self._search_step continue break self._radius = radius - self._search_step LOGGER.debug(TAG, 'best radius is: %s', self._radius) LOGGER.debug(TAG, 'time used to train detector of %d samples is: %s seconds', inputs.shape[0], time.time() - time_start) return self._radius
def _generate_hyper_cube(self, inputs, radius): """ Generate random samples in the hyper cubes around input samples. Args: inputs (numpy.ndarray): Input samples. radius (float): The scope to generate hyper cubes around input samples. Returns: numpy.ndarray, randomly chosen samples in the hyper cubes. """ LOGGER.debug(TAG, 'enter _generate_hyper_cube().') res = [] for _ in range(self._number_points): res.append(np.clip((inputs + np.random.uniform( -radius, radius, len(inputs))), 0.0, 1.0).astype(inputs.dtype)) return np.asarray(res) def _rc_forward(self, inputs, radius): """ Generate region-based predictions for input samples. Args: inputs (numpy.ndarray): Input samples. radius (float): The scope to generate hyper cubes around input samples. Returns: numpy.ndarray, classification result for input samples. """ LOGGER.debug(TAG, 'enter _rc_forward().') res = [] for _, elem in enumerate(inputs): hyper_cube_x = self._generate_hyper_cube(elem, radius) hyper_cube_preds = [] for ite_hyper_cube_x in hyper_cube_x: model_inputs = Tensor(np.expand_dims(ite_hyper_cube_x, axis=0)) ite_preds = self._model.predict(model_inputs).asnumpy()[0] hyper_cube_preds.append(ite_preds) pred_labels = np.argmax(hyper_cube_preds, axis=1) bin_count = np.bincount(pred_labels) # count the number of different class and choose the max one # as final class hyper_cube_tag = np.argmax(bin_count, axis=0) res.append(hyper_cube_tag) return np.asarray(res)
[docs] def detect(self, inputs): """ Tell whether input samples are adversarial or not. Args: inputs (numpy.ndarray): Suspicious samples to be judged. Returns: list[int], whether a sample is adversarial. if res[i]=1, then the input sample with index i is adversarial. """ LOGGER.debug(TAG, 'enter detect().') self._radius = check_param_type('radius', self._radius, float) inputs = check_numpy_param('inputs', inputs) time_start = time.time() res = [1]*inputs.shape[0] raw_preds = np.argmax(self._model.predict(Tensor(inputs)).asnumpy(), axis=1) rc_preds = self._rc_forward(inputs, self._radius) for i in range(inputs.shape[0]): if raw_preds[i] == rc_preds[i]: res[i] = 0 LOGGER.debug(TAG, 'time used to detect %d samples is : %s seconds', inputs.shape[0], time.time() - time_start) return res
[docs] def detect_diff(self, inputs): """ Return raw prediction results and region-based prediction results. Args: inputs (numpy.ndarray): Input samples. Returns: numpy.ndarray, raw prediction results and region-based prediction results of input samples. """ LOGGER.debug(TAG, 'enter detect_diff().') inputs = check_numpy_param('inputs', inputs) raw_preds = self._model.predict(Tensor(inputs)) rc_preds = self._rc_forward(inputs, self._radius) return raw_preds.asnumpy(), rc_preds
[docs] def transform(self, inputs): """ Generate hyper cube for input samples. Args: inputs (numpy.ndarray): Input samples. Returns: numpy.ndarray, hyper cube corresponds to every sample. """ LOGGER.debug(TAG, 'enter transform().') inputs = check_numpy_param('inputs', inputs) res = [] for _, elem in enumerate(inputs): res.append(self._generate_hyper_cube(elem, self._radius)) return np.asarray(res)