mindspore_xai.explainer.neighbor.pseudo_linear_coef 源代码

# Copyright 2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Pseudo Linear Coefficients (PLC)."""
from tqdm import tqdm

import mindspore as ms
from mindspore import nn
from mindspore import ops
from mindspore import ms_function
from mindspore.train._utils import check_value_type
import numpy as np
import matplotlib.pyplot as plt

from mindspore_xai.common.utils import is_notebook
from mindspore_xai.tool.tab.neighbor import SimpleNN


_squeeze = ops.Squeeze()
_square = ops.Square()
_sqrt = ops.Sqrt()
_zeros = ops.Zeros()
_log = ops.Log()
_not = ops.LogicalNot()
_is_finite = ops.IsFinite()


class _StepwiseComputer(nn.Cell):
    """Helper of computing PLC for stepwise classifiers."""
    def __init__(self, eps):
        super().__init__()
        self._eps = eps

    @ms_function
    def construct(self, queries, nearests):
        """Computation."""
        displaces = nearests - queries
        dists = _sqrt(_square(displaces).sum(1))
        is_same = dists < self._eps
        dists += is_same.astype(ms.float32)
        dists = dists.reshape((-1, 1))
        unit_vecs = displaces / dists
        unit_vecs *= _not(is_same).reshape((-1, 1))
        plc = unit_vecs.sum(0) / queries.shape[0]
        return plc


class _Computer(nn.Cell):
    """Helper of computing PLC."""
    def __init__(self, classifier, riemann, eps):
        super().__init__()
        self._classifier = classifier
        self._riemann = riemann
        self._t = ms.Tensor([p / riemann for p in range(riemann)], dtype=ms.float32)
        self._t = self._t.reshape((-1, 1))
        self._eps = eps

    @ms_function
    def _pre_compute(self, query, nearest):
        """Prepare for the actual computation."""
        displace = _squeeze(nearest - query)
        sq_dist = _square(displace).sum()
        u = query * (1 - self._t) + nearest * self._t
        return displace, sq_dist, u

    @ms_function
    def _compute(self, displace, sq_dist, fu, plc_sum):
        """Do compute the PLC."""
        # Riemann sum
        minus_fu = 1 - fu
        log_fu = _log(fu + self._eps)
        log_minus_fu = _log(minus_fu + self._eps)
        h = -(fu * log_fu) - (minus_fu * log_minus_fu)
        ig_h = h.sum() / self._riemann
        ig_h /= 0.69314718056  # ln(2)=0.69314718056, change to base 2

        sample_plc = (displace * (fu[-1] - fu[0])) / (sq_dist * ig_h)
        sample_plc = sample_plc.masked_fill(_not(_is_finite(sample_plc)), 0.0)

        return plc_sum + sample_plc

    def construct(self, target, query, nearest, plc_sum):
        """Computation."""
        displace, sq_dist, u = self._pre_compute(query, nearest)
        if sq_dist < self._eps:
            return plc_sum
        fu = self._classifier(u)[:, target]
        return self._compute(displace, sq_dist, fu, plc_sum)


[文档]class PseudoLinearCoef: r""" Pseudo Linear Coefficients (PLC) for classifiers. PLC is a global attribution method, it is a measure of feature sensitivities around the classifier's decision boundaries from the data distribution's point of view. PLC of class A: .. math:: \vec{R}(A)=\int \vec{S}(A,nearest_{A}(x),x)p_{\neg A}(x)dx PLC of class A (target class) relative to class B (view point class), it is called Relative PLC: .. math:: \vec{R}(A,B)=\int \vec{S}(A,nearest_{A}(x),x)p_{B}(x)dx Where: .. math:: nearest_A(x):=\underset{g\in G}{argmin}(\left \| g-x \right \|)\text{ }s.t.\text{ } g\neq x,f_A(g) \geq \xi \vec{S}(A,a,x)=\left\{\begin{matrix} \vec{0} & \text{if }f_A(x)\geq \xi \\ \frac{a-x}{\left \| a-x \right \|} & \text{if }f_A(\cdot )\text{ is a step function}\\ \frac{(a-x)(f_{A}(a)-f_A(x))}{\left \| a-x \right \|^{2}\int_{0}^{1}h(f_A(u(t)))dt} & \text{else} \end{matrix}\right. .. math:: u(t)=ta+(1-t)x .. math:: h(f_{A})=-f_{A}log_2(f_{A})-(1-f_A)log_2(1-f_A) :math:`G` is the universal sample set, :math:`f_A(\cdot )` is the predicted probability of class A, :math:`\xi` is the decision threshold (usually 0.5). :math:`p_{\neg A}` and :math:`p_{B}` are the PDF of sample's distribution of non A class(es) and class B representatively. Beware that the ground truth labels take no part in PLC, a sample's classes are determined by the classifier. Note: If `predictor` is a function, `stepwise` is `False` and it is running in graph mode then `predictor` must complies with the `static graph syntax <https://mindspore.cn/docs/en/master/note/static_graph_syntax_support.html>`_. PLC may not be accurate if there are many samples classified to more than one class. Args: predictor (Cell, Callable): The classifier :math:`f(\cdot )` to be explained, it must take an input tensor with shape :math:`(N, K)` and output a probability tensor with shape :math:`(N, L)`. :math:`K` is the number of features. Both input and output tensors should have dtype `ms.float32` or `ms.float64` . num_classes (int): The number of classes :math:`L`. class_names (list[str], tuple[str], optional): List/tuple of class names, ordered according to whatever the classifier is using. If not present, class names will be 'Class 0', 'Class 1', ... Default: ``None``. feature_names (list[str], tuple[str], optional): List/tuple of feature names corresponding to the columns in the training data. If not present, feature names will be 'feature 0', 'feature 1', ... Default: ``None``. stepwise (bool, optional): Set to ``True`` if `predictor` outputs 0s and 1s only. Default: `False`. threshold (float, optional): Decision threshold :math:`\xi` of classification. Default: ``0.5``. monte_carlo (int, optional): The number of Monte Carlo samples for computing the integrals :math:`\vec{R}`. Default: ``1000``. Higher the number more lengthy and accurate the computation. riemann (int, optional): The number of Riemann sum partitions for computing the integrals :math:`\int_{0}^{1}h(f_A(u(t)))dt`. Default: ``1000``. Higher the number more lengthy and accurate the computation. batch_size(int, optional): Batch size for `predictor` when finding nearest neighbors. Default: ``2000``. eps (float, optional): Degree of tolerance. This value must be greater than 0. Default: ``1e-9``. Inputs: - **features** (Tensor) - The universal sample set :math:`G`. Practically, it is often the training set or its random subset. The shape must be :math:`(|G|, K)`, :math:`|G|` is the total number of samples. The input tensor should have dtype `ms.float32` or `ms.float64` . - **max_classes** (int, optional)- Maximum number of classes to be shown. Default: ``5``. - **max_features** (int, optional) - Maximum number of features to be shown. Default: ``5``. - **show** (bool, optional) - Show the explanation figures, ``None`` means automatically show the explanation figures if it is running on JupyterLab. Default: ``None``. Outputs: - **plc** (Tensor) - Pseudo Linear Coefficients in shape of :math:`(L, K)`. - **relative plc** (Tensor) - Relative Pseudo Linear Coefficients in shape of :math:`(L, L, K)`. The first :math:`L` axis is for the target classes and the second one is for the view point classes. Raises: TypeError: Be raised for any argument or input type problem. ValueError: Be raised for any input value problem. AttributeError: Be raised for underlying is missing any required attribute. Supported Platforms: ``Ascend`` ``GPU`` ``CPU`` Examples: >>> import numpy as np >>> import mindspore as ms >>> from mindspore import nn >>> from mindspore import ops >>> from mindspore_xai.explainer import PseudoLinearCoef >>> >>> class Classifier(nn.Cell): ... def construct(self, x): ... y = ops.Zeros()((x.shape[0], 3), ms.float32) ... y[:, 0] = -x[:, 0] + x[:, 1] + x[: ,2] - 0.5 ... y[:, 1] = x[:, 0] - x[:, 1] + x[: ,2] - 0.5 ... y[:, 2] = x[:, 0] + x[:, 1] - x[: ,2] - 0.5 ... return ops.Sigmoid()(y * 10) >>> >>> classifier = Classifier() >>> explainer = PseudoLinearCoef(classifier, num_classes=3) >>> features = ms.Tensor(np.random.uniform(size=(10000, 5)), dtype=ms.float32) # 5 features >>> plc, relative_plc = explainer(features) >>> print(str(plc.shape)) (3, 5) >>> print(str(relative_plc.shape)) (3, 3, 5) """ def __init__(self, predictor, num_classes, class_names=None, feature_names=None, stepwise=False, threshold=0.5, monte_carlo=1000, riemann=1000, batch_size=2000, eps=1e-9): if not (callable(predictor) or isinstance(predictor, nn.Cell)): raise ValueError("predictor must be Cell object or function.") check_value_type("num_classes", num_classes, int) check_value_type("class_names", class_names, [list, tuple, type(None)]) if not((class_names is None) or all(isinstance(n, str) for n in class_names)): raise ValueError("The elements in class_names should be str.") self._check_names("class_names", class_names, num_classes) check_value_type("feature_names", feature_names, [list, tuple, type(None)]) if not((feature_names is None) or all(isinstance(n, str) for n in feature_names)): raise ValueError("The elements in feature_names should be str.") check_value_type("stepwise", stepwise, bool) check_value_type("threshold", threshold, float) self._check_values("threshold", threshold) check_value_type("monte_carlo", monte_carlo, int) self._check_values("monte_carlo", monte_carlo) check_value_type("riemann", riemann, int) self._check_values("riemann", riemann) check_value_type("batch_size", batch_size, int) self._check_values("batch_size", batch_size) check_value_type("eps", eps, float) self._check_values("eps", eps) self._classifier = predictor self._num_classes = num_classes self._class_names = class_names self._feature_names = feature_names self._stepwise = stepwise self._threshold = threshold self._monte_carlo = monte_carlo self._batch_size = batch_size self._eps = eps if self._stepwise: self._computer = _StepwiseComputer(eps) else: self._computer = _Computer(predictor, riemann, eps) self._computer.set_train(False) def __call__(self, features, max_classes=5, max_features=5, show=None): """Compute PLC and Relative PLC.""" check_value_type("features", features, ms.Tensor) if not ((features.dtype == ms.float32) or (features.dtype == ms.float64)): raise ValueError("The features tensor should have dtype ms.float32 or ms.float64.") self._check_names("feature_names", self._feature_names, features.shape[1]) check_value_type("max_classes", max_classes, int) self._check_values("max_classes", max_classes) check_value_type("max_features", max_features, int) self._check_values("max_features", max_features) check_value_type("show", show, [bool, type(None)]) if show is None: show = is_notebook() nn_finder = SimpleNN(features, self._classifier, self._num_classes, batch_size=self._batch_size, threshold=self._threshold) plc = np.zeros((self._num_classes, features.shape[1]), dtype=np.float32) relative_plc = _zeros((self._num_classes, self._num_classes, features.shape[1]), ms.float32) # may different from features.shape[0] all_finder_samples_count = sum([nn_finder.sample_count(c) for c in range(self._num_classes)]) pairs = [(t, vp) for t in range(self._num_classes) for vp in range(self._num_classes) if t != vp] for target, view_point in tqdm(pairs, desc='Compute Pseudo Linear Coef.'): vp_samples = nn_finder.sample_count(view_point) if vp_samples == 0: continue total_vp_samples = all_finder_samples_count - nn_finder.sample_count(target) plc_ele = self._relative(target, view_point, features, nn_finder) relative_plc[target, view_point] = plc_ele vp_weight = vp_samples / total_vp_samples plc[target] += (plc_ele * vp_weight).asnumpy() if show: plc_list = self._plc_to_list(plc, max_classes) sorted_id = np.argsort(list(map(abs, plc_list[0]))) if self._feature_names is None: self._feature_names = ['feature {}'.format(x) for x in range(len(plc_list[0]))] sorted_feat = [np.take(x, sorted_id) for x in [self._feature_names]][0] for target, target_plc in enumerate(plc_list): target_plc = [np.take(x, sorted_id) for x in [target_plc]][0] if max_features < len(self._feature_names): target_plc, sorted_feat = self._limit_feat(target_plc, sorted_feat, max_features) title, yaxis_label = self._display_format(self._class_names, target, sorted_feat, target_plc) if target == len(plc_list)-1: self._display(target_plc, yaxis_label, title, self._num_classes-max_classes, features.shape[1]-max_features) else: self._display(target_plc, yaxis_label, title, 0, 0) return ms.Tensor(plc), relative_plc @staticmethod def _plc_to_list(plc, max_classes): """Convert the plc tensor to list.""" if plc.shape[0] > max_classes: plc = plc[:max_classes] plc_list = list(plc.asnumpy()) return plc_list @staticmethod def _check_names(var_name, input_names, num_data): """Check the length of the feature names and class names.""" if not((input_names is None) or (num_data == len(input_names))): raise ValueError('The length of {} should be equal to {}'.format(var_name, num_data)) @staticmethod def _check_values(var_name, value): """Check the values.""" if value <= 0: raise ValueError('The value of {} should be greater than 0.'.format(var_name)) @staticmethod def _display_format(class_names, target, sorted_feat, target_plc): """Create title and labels for the graph.""" if class_names is not None: title = class_names[target] else: title = 'Class {}'.format(target) yaxis_label = ['{0} : {1:.5g}'.format(sorted_feat[x], float(target_plc[x])) for x in range(len(sorted_feat))] return title, yaxis_label
[文档] @classmethod def plot(cls, plc, title=None, feature_names=None, max_features=5): r""" Plot the specific bidirectional chart for a PLC or Relative PLC pair. Args: plc (Tensor): Pseudo Linear Coefficients or Relative Pseudo Linear Coefficients in shape of :math:`(K,)`. title (str, optional): Chart title. If not present, chart title will not be displayed. Default: ``None``. feature_names (list[str], tuple[str], optional): Feature names. If not present, feature names will be 'feature 0', 'feature 1', ... Default: ``None``. max_features (int, optional): Maximum number of features to be shown. Default: ``5``. Raises: ValueError: Be raised for any input value problem. Examples: >>> from mindspore import Tensor >>> from mindspore_xai.explainer import PseudoLinearCoef >>> >>> plc = Tensor([[0.1, 0.6, 0.8], [-2, 0.2, 0.4], [0.4, 0.1, -0.1]]) >>> PseudoLinearCoef.plot(plc[0], title='Chart Title', feature_names=['f1','f2','f3']) >>> >>> relative_plc = Tensor([[[0., 0., 0.], [-2, 0.2, 0.4]], [[0.4, 0.1, -0.1], [0., 0., 0.]]]) >>> PseudoLinearCoef.plot(relative_plc[0, 1], title='Chart Title', feature_names=['f1','f2','f3']) """ check_value_type("plc", plc, ms.Tensor) if not ((plc.dtype == ms.float32) or (plc.dtype == ms.float64)): raise ValueError("The plc tensor should have dtype ms.float32 or ms.float64.") check_value_type("title", title, [str, type(None)]) check_value_type("feature_names", feature_names, [list, tuple, type(None)]) if not((feature_names is None) or all(isinstance(n, str) for n in feature_names)): raise ValueError("The elements in feature_names should be str.") cls._check_names("feature_names", feature_names, len(plc)) check_value_type("max_features", max_features, int) cls._check_values("max_features", max_features) sorted_plc, sorted_feat = cls._sort_order(list(plc.asnumpy()), feature_names) if len(sorted_plc) > max_features: sorted_plc, sorted_feat = cls._limit_feat(sorted_plc, sorted_feat, max_features) yaxis_label = ['{0} : {1:.5g}'.format(sorted_feat[x], float(sorted_plc[x])) for x in range(len(sorted_plc))] cls._display(sorted_plc, yaxis_label, title, 0, len(sorted_plc) - max_features)
@staticmethod def _limit_feat(sorted_plc, sorted_feat, max_features): """Limit the number of features.""" sorted_plc = sorted_plc[-max_features:] sorted_feat = sorted_feat[-max_features:] return sorted_plc, sorted_feat @staticmethod def _sort_order(plc, feature_names): """Arrange the value with their id in descending order.""" if feature_names is None: feature_names = ['feature {}'.format(x) for x in range(len(plc))] sort_id = np.argsort(list(map(abs, plc))) feature_names, plc = [np.take(x, sort_id) for x in [feature_names, plc]] return plc, feature_names @staticmethod def _display(plc, yaxis_label, title, classes_left, features_left): """Display the graph for the PLC and relative PLC.""" plt.figure(figsize=(10, ((len(plc)/2.0) + 0.5))) colors = ['green' if x > 0 else 'red' for x in plc] pos = np.arange(len(plc)) + .5 plt.barh(pos, plc, align='center', color=colors) plt.yticks(pos, yaxis_label) if title is not None: plt.title(title) if classes_left > 0 and features_left > 0: plt.xlabel('{} more class(es) and {} more feature(s)... '.format(classes_left, features_left), loc='right') elif classes_left > 0: plt.xlabel('{} more class(es)... '.format(classes_left), loc='right') elif features_left > 0: plt.xlabel('{} more feature(s)... '.format(features_left), loc='right')
[文档] @classmethod def normalize(cls, plc, per_vector=False, eps=1e-9): r""" Normalize Pseudo Linear Coefficients to range [-1, 1]. Warning: Normalizing PLC from unnormalized features may lead to misleading results. Args: plc (Tensor): The PLC or Relative PLC to be normalized. per_vector (bool, optional): Normalize within each :math:`\vec{R}` vector. Default: ``False``. eps (float, optional): Degree of tolerance. This value must be greater than 0. Default: ``1e-9``. Returns: Tensor, the normalized values. Examples: >>> from mindspore import Tensor >>> from mindspore_xai.explainer import PseudoLinearCoef >>> >>> plc = Tensor([[0.1, 0.6, 0.8], [-2, 0.2, 0.4], [0.4, 0.1, -0.1]]) >>> print(PseudoLinearCoef.normalize(plc)) [[ 0.05 0.3 0.4 ] [-1. 0.1 0.2 ] [ 0.2 0.05 -0.05]] >>> print(PseudoLinearCoef.normalize(plc, per_vector=True)) [[ 0.125 0.75 1. ] [-1. 0.1 0.2 ] [ 1. 0.25 -0.25 ]] """ check_value_type("plc", plc, ms.Tensor) if not((plc.dtype == ms.float32) or (plc.dtype == ms.float64)): raise ValueError("The plc tensor should have dtype ms.float32 or ms.float64.") check_value_type("per_vector", per_vector, bool) check_value_type("eps", eps, float) cls._check_values("eps", eps) if not plc.shape: return plc if per_vector and plc.shape[-1] > 0: scale = plc.abs().max(axis=-1, keepdims=True) scale = scale.masked_fill(scale < eps, 1) return plc / scale scale = plc.abs().max() if scale > eps: return plc / scale return plc
def _relative(self, target, view_point, features, nn_finder): """Compute Relative PLC.""" vp_samples_count = nn_finder.sample_count(view_point) if self._monte_carlo < vp_samples_count: picked = np.random.choice(np.arange(vp_samples_count), size=self._monte_carlo, replace=False) queries = features[nn_finder.sample_idxs(view_point)[ms.Tensor(picked, dtype=ms.int32)]] else: queries = features[nn_finder.sample_idxs(view_point)] nearests = nn_finder(queries, target) if self._stepwise: return self._computer(queries, nearests) plc_sum = _zeros(features.shape[1], ms.float32) target = ms.Tensor(target, dtype=ms.int32) for query, nearest in tqdm(zip(queries, nearests), leave=False, total=queries.shape[0], desc=f'Class {target} Relative to Class {view_point}'): plc_sum = self._computer(target, query, nearest, plc_sum) return plc_sum / queries.shape[0]