Source code for mindarmour.reliability.model_fault_injection.fault_injection

# Copyright 2021 Huawei Technologies Co., Ltd
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Fault injection module
"""

import random
import numpy as np

import mindspore
from mindspore import ops, Tensor

from mindarmour.reliability.model_fault_injection.fault_type import FaultType
from mindarmour.utils.logger import LogUtil
from mindarmour.utils._check_param import check_int_positive, check_param_type, _check_array_not_empty

LOGGER = LogUtil.get_instance()
TAG = 'FaultInjector'


[docs]class FaultInjector: """ Fault injection module simulates various fault scenarios for deep neural networks and evaluates performance and reliability of the model. For details, please check `Tutorial <https://mindspore.cn/mindarmour/docs/zh-CN/master/fault_injection.html>`_. Args: model (Model): The model need to be evaluated. fi_type (list): The type of the fault injection which include bitflips_random(flip randomly), bitflips_designated(flip the key bit), random, zeros, nan, inf, anti_activation precision_loss etc. fi_mode (list): The mode of fault injection. Fault inject on just single layer or all layers. fi_size (list): The number of fault injection.It mean that how many values need to be injected. Examples: >>> from mindspore import Model >>> import mindspore.ops.operations as P >>> from mindarmour.reliability.model_fault_injection.fault_injection import FaultInjector >>> class Net(nn.Cell): ... def __init__(self): ... super(Net, self).__init__() ... self._softmax = P.Softmax() ... self._Dense = nn.Dense(10,10) ... self._squeeze = P.Squeeze(1) ... def construct(self, inputs): ... out = self._softmax(inputs) ... out = self._Dense(out) ... return self._squeeze(out) >>> def dataset_generator(): ... batch_size = 16 ... batches = 1 ... data = np.random.randn(batches * batch_size,1,10).astype(np.float32) ... label = np.random.randint(0,10, batches * batch_size).astype(np.int32) ... for i in range(batches): ... yield data[i*batch_size:(i+1)*batch_size], label[i*batch_size:(i+1)*batch_size] >>> net = Net() >>> model = Model(net) >>> ds_eval = ds.GeneratorDataset(dataset_generator, ['image', 'label']) >>> fi_type = ['bitflips_random', 'bitflips_designated', 'random', 'zeros', ... 'nan', 'inf', 'anti_activation', 'precision_loss'] >>> fi_mode = ['single_layer', 'all_layer'] >>> fi_size = [1] >>> fi = FaultInjector(model, ds_eval, fi_type, fi_mode, fi_size) >>> fi.kick_off() >>> fi.metrics() """ def __init__(self, model, fi_type=None, fi_mode=None, fi_size=None): """FaultInjector initiated.""" self.running_list = [] self.fi_type_map = {} self._init_running_list(fi_type, fi_mode, fi_size) self.model = model self._fault_type = FaultType() self._check_param() self.result_list = [] self.original_acc = 0 self.original_parameter = {} self.argmax = ops.Argmax() self._reducesum = ops.ReduceSum(keep_dims=False) self._frozen() def _check_param(self): """Check input parameters.""" ori_attr = self._fault_type.__dir__() attr = [] for attr_ in ori_attr: if not attr_.startswith('__') and attr_ not in ['_bitflip', '_fault_inject']: attr.append(attr_) if not isinstance(self.model, mindspore.Model): msg = "'Input model should be Mindspore Model', got {}.".format(type(self.model)) LOGGER.error(TAG, msg) raise TypeError(msg) for param in self.running_list: if param['fi_type'] not in attr: msg = "'Undefined fault type', got {}.".format(self.fi_type_map[param['fi_type']]) LOGGER.error(TAG, msg) raise ValueError(msg) if param['fi_mode'] not in ['single_layer', 'all_layer']: msg = "'fault mode should be single_layer or all_layer', but got {}.".format(param['fi_mode']) LOGGER.error(TAG, msg) raise ValueError(msg) _ = check_int_positive('fi_size', param['fi_size']) def _init_running_list(self, type_, mode_, size_): """Initiate fault injection parameters of this evaluation.""" if type_ is None: type_ = ['bitflips_random', 'bitflips_designated', 'random', 'zeros', 'nan', 'inf', 'anti_activation', 'precision_loss'] if mode_ is None: mode_ = ['single_layer', 'all_layer'] if size_ is None: size_ = list(range(1, 4)) if not isinstance(type_, list): msg = "'fi_type should be list', got {}.".format(type(type_)) LOGGER.error(TAG, msg) raise TypeError(msg) if not isinstance(mode_, list): msg = "'fi_mode should be list', got {}.".format(type(mode_)) LOGGER.error(TAG, msg) raise TypeError(msg) if not isinstance(size_, list): msg = "'fi_size should be list', got {}.".format(type(size_)) LOGGER.error(TAG, msg) raise TypeError(msg) for i in type_: if not isinstance(i, str): msg = "'fi_type element should be str', got {} type {}.".format(i, type(i)) LOGGER.error(TAG, msg) raise TypeError(msg) new_i = i if i.startswith('_') else '_' + i self.fi_type_map[new_i] = i for j in mode_: for k in size_: dict_ = {'fi_type': new_i, 'fi_mode': j, 'fi_size': k} self.running_list.append(dict_) def _frozen(self): """Store original parameters of model.""" trainable_param = self.model.predict_network.trainable_params() for param in trainable_param: np_param = param.asnumpy().copy() bytes_ = np_param.tobytes() self.original_parameter[param.name] = {} self.original_parameter[param.name]['datatype'] = np_param.dtype self.original_parameter[param.name]['shape'] = np_param.shape self.original_parameter[param.name]['data'] = bytes_.hex() def _reset_model(self): """Reset model with original parameters.""" for weight in self.model.predict_network.trainable_params(): name = weight.name if name in self.original_parameter.keys(): bytes_w = bytes.fromhex(self.original_parameter[name]['data']) datatype_w = self.original_parameter[name]['datatype'] shape_w = self.original_parameter[name]['shape'] np_w = np.frombuffer(bytes_w, dtype=datatype_w).reshape(shape_w) weight.assign_value(Tensor.from_numpy(np_w)) else: msg = "Layer name not matched, got {}.".format(name) LOGGER.error(TAG, msg) raise KeyError(msg) @staticmethod def _calculate_batch_size(num, iter_times): """Calculate batch size based on iter_times.""" if num <= iter_times: batch_list = [1] * num idx_list = [0] * (num + 1) else: base_batch_size = num // iter_times gt_num = num - iter_times * base_batch_size le_num = iter_times - gt_num batch_list = [base_batch_size + 1] * gt_num + [base_batch_size] * le_num idx_list = [0] * (iter_times + 1) for i, _ in enumerate(batch_list): idx_list[i + 1] = idx_list[i] + batch_list[i] return idx_list @staticmethod def _check_kick_off_param(ds_data, ds_label, iter_times): """check input data and label.""" _ = check_int_positive('iter_times', iter_times) _ = check_param_type('ds_data', ds_data, np.ndarray) _ = _check_array_not_empty('ds_data', ds_data) _ = check_param_type('ds_label', ds_label, np.ndarray) _ = _check_array_not_empty('ds_label', ds_label)
[docs] def kick_off(self, ds_data, ds_label, iter_times=100): """ Startup and return final results after Fault Injection. Args: ds_data(np.ndarray): Input data for testing. The evaluation is based on this data. ds_label(np.ndarray): The label of data, corresponding to the data. iter_times(int): The number of evaluations, which will determine the batch size. Returns: - list, the result of fault injection. """ self._check_kick_off_param(ds_data, ds_label, iter_times) num = ds_data.shape[0] idx_list = self._calculate_batch_size(num, iter_times) result_list = [] for i in range(-1, len(self.running_list)): arg = self.running_list[i] total = 0 correct = 0 for idx in range(len(idx_list) - 1): a = ds_data[idx_list[idx]:idx_list[idx + 1], ...] batch = Tensor.from_numpy(a) label = Tensor.from_numpy(ds_label[idx_list[idx]:idx_list[idx + 1], ...]) if label.ndim == 2: label = self.argmax(label) if i != -1: self._reset_model() self._layer_states(arg['fi_type'], arg['fi_mode'], arg['fi_size']) output = self.model.predict(batch) predict = self.argmax(output) mask = predict == label total += predict.size correct += self._reducesum(mask.astype(mindspore.float32)).asnumpy() acc = correct / total if total else 0 if i == -1: self.original_acc = acc result_list.append({'original_acc': self.original_acc}) else: result_list.append({'type': arg['fi_type'][1:], 'mode': arg['fi_mode'], 'size': arg['fi_size'], 'acc': acc, 'SDC': self.original_acc - acc}) self._reset_model() self.result_list = result_list return result_list
[docs] def metrics(self): """ Metrics of final result. Returns: - list, the summary of result. """ result_summary = [] single_layer_acc = [] single_layer_sdc = [] all_layer_acc = [] all_layer_sdc = [] for result in self.result_list: if 'mode' in result.keys(): if result['mode'] == 'single_layer': single_layer_acc.append(float(result['acc'])) single_layer_sdc.append(float(result['SDC'])) else: all_layer_acc.append(float(result['acc'])) all_layer_sdc.append(float(result['SDC'])) s_acc = np.array(single_layer_acc) s_sdc = np.array(single_layer_sdc) a_acc = np.array(all_layer_acc) a_sdc = np.array(all_layer_sdc) if single_layer_acc: result_summary.append('single_layer_acc_mean:%f single_layer_acc_max:%f single_layer_acc_min:%f' % (np.mean(s_acc), np.max(s_acc), np.min(s_acc))) result_summary.append('single_layer_SDC_mean:%f single_layer_SDC_max:%f single_layer_SDC_min:%f' % (np.mean(s_sdc), np.max(s_sdc), np.min(s_sdc))) if all_layer_acc: result_summary.append('all_layer_acc_mean:%f all_layer_acc_max:%f all_layer_acc_min:%f' % (np.mean(a_acc), np.max(a_acc), np.min(a_acc))) result_summary.append('all_layer_SDC_mean:%f all_layer_SDC_max:%f all_layer_SDC_min:%f' % (np.mean(a_sdc), np.max(a_sdc), np.min(a_sdc))) return result_summary
def _layer_states(self, fi_type, fi_mode, fi_size): """FI in layer states.""" # Choose a random layer for injection if fi_mode == "single_layer": # Single layer fault injection mode random_num = [random.randint(0, len(self.model.predict_network.trainable_params()) - 1)] elif fi_mode == "all_layer": # Multiple layer fault injection mode random_num = list(range(len(self.model.predict_network.trainable_params()) - 1)) else: msg = 'undefined fi_mode {}'.format(fi_mode) LOGGER.error(TAG, msg) raise ValueError(msg) for n in random_num: # Get layer states info w = self.model.predict_network.trainable_params()[n] w_np = w.asnumpy().copy() elem_shape = w_np.shape w_np = w_np.reshape(-1) # fault inject new_w_np = self._fault_type._fault_inject(w_np, fi_type, fi_size) # Reshape into original dimensions and store the faulty tensor new_w_np = np.reshape(new_w_np, elem_shape) w.set_data(Tensor.from_numpy(new_w_np))