mindarmour.reliability.concept_drift.concept_drift_check_time_series 源代码

# -*- coding: utf-8 -*-
# Copyright 2021 Huawei Technologies Co., Ltd
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Concpt drift module
"""

from itertools import groupby
import numpy as np
import matplotlib.pyplot as plt
from mindarmour.utils._check_param import _check_array_not_empty
from mindarmour.utils._check_param import check_param_type, check_param_in_range


[文档]class ConceptDriftCheckTimeSeries: r""" ConceptDriftCheckTimeSeries is used for example series distribution change detection. For details, please check `Implementing the Concept Drift Detection Application of Time Series Data <https://mindspore.cn/mindarmour/docs/en/master/concept_drift_time_series.html>`_. Args: window_size(int): Size of a concept window, no less than 10. If given the input data, window_size belongs to [10, 1/3*len(input data)]. If the data is periodic, usually window_size equals 2-5 periods, such as, for monthly/weekly data, the data volume of 30/7 days is a period. Default: ``100``. rolling_window(int): Smoothing window size, belongs to [1, window_size]. Default: ``10``. step(int): The jump length of the sliding window, belongs to [1, window_size]. Default: ``10``. threshold_index(float): The threshold index, :math:`(-\infty, +\infty)`. Default: ``1.5``. need_label(bool): If ``True``, concept drift labels are needed. Default: ``False``. Examples: >>> from mindarmour import ConceptDriftCheckTimeSeries >>> concept = ConceptDriftCheckTimeSeries(window_size=100, rolling_window=10, ... step=10, threshold_index=1.5, need_label=False) >>> data_example = 5*np.random.rand(1000) >>> data_example[200: 800] = 20*np.random.rand(600) >>> score, threshold, concept_drift_location = concept.concept_check(data_example) """ def __init__(self, window_size=100, rolling_window=10, step=10, threshold_index=1.5, need_label=False): self.window_size = check_param_type('window_size', window_size, int) self.rolling_window = check_param_type('rolling_window', rolling_window, int) self.step = check_param_type('step', step, int) self.threshold_index = check_param_type('threshold_index', threshold_index, float) self.need_label = check_param_type('need_label', need_label, bool) self._in_size = window_size self._out_size = window_size self._res_size = int(0.1*window_size) def _reservoir_model_feature(self, window_data): """ Extract example features in reservoir model. Args: window_data(numpy.ndarray): The input data (in one window). Returns: - numpy.ndarray, the output weight of reservoir model. - numpy.ndarray, the state of the reservoir model in the latent space. Examples: >>> input_data = np.random.rand(100) >>> w, x = ConceptDriftCheckTimeSeries._reservoir_model_feature(window_data) """ # Initialize weights res_size = self._res_size x_state = _w_generate(res_size, len(window_data), window_data) x_state_t = x_state.T # Data reshape data_channel = None if window_data.ndim == 2: data_channel = window_data.shape[1] if window_data.ndim == 1: data_channel = 1 y_t = window_data.reshape(len(window_data), data_channel) reg = 1e-8 # Calculate w_out w_out = np.dot(np.dot(y_t, x_state_t), np.linalg.inv(np.dot(x_state, x_state_t) + reg*np.eye(res_size))) return w_out, x_state def _concept_distance(self, data_x, data_y): """ Calculate the distance of two example blocks. Args: data_x(numpy.ndarray): Data x. data_y(numpy.ndarray): Data y. Returns: - float, distance between data_x and data_y. Examples: >>> x = np.random.rand(100) >>> y = np.random.rand(100) >>> score = ConceptDriftCheckTimeSeries._concept_distance(x, y) """ # Feature extraction feature_x = self._reservoir_model_feature(data_x) feature_y = self._reservoir_model_feature(data_y) # Calculate distance distance_wx = sum(abs(np.dot(feature_x[0], feature_x[1]) - np.dot(feature_y[0], feature_y[1])))/len(data_x) statistic_feature = abs(data_x.mean() - data_y.mean()).mean() distance_score = (distance_wx + statistic_feature) / (1 + distance_wx + statistic_feature) distance_score_mean = distance_score.mean() return distance_score_mean def _data_process(self, data): """ Data processing. Args: data(numpy.ndarray): Input data. Returns: - numpy.ndarray, data after smoothing. Examples: >>> data_example = np.random.rand(100) >>> data_example = ConceptDriftCheckTimeSeries._data_process(data_example) """ temp = [] data_channel = None if data.ndim == 2: data_channel = data.shape[1] if data.ndim == 1: data_channel = 1 data = data.reshape(len(data), data_channel) # Moving average for i in range(data_channel): data_av = np.convolve(data[:, i], np.ones((self.rolling_window,)) / self.rolling_window, 'valid') data_av = np.append(data_av, np.ones(self.rolling_window - 1)*data_av[-1]) data_av = (data_av - data_av.min()) / (data_av.max() - data_av.min()) temp.append(data_av) smooth_data = np.array(temp).T return smooth_data
[文档] def concept_check(self, data): """ Find concept drift locations in a data series. Args: data(numpy.ndarray): Input data. The shape of data could be :math:`(n,1)` or :math:`(n,m)`. Note that each column (m columns) is one data series. Returns: - numpy.ndarray, the concept drift score of the example series. - float, the threshold to judge concept drift. - list, the location of the concept drift. """ # data check data = _check_array_not_empty('data', data) data = check_param_type('data', data, np.ndarray) check_param_in_range('window_size', self.window_size, 10, int((1 / 3)*len(data))) check_param_in_range('rolling_window', self.rolling_window, 1, self.window_size) check_param_in_range('step', self.step, 1, self.window_size) original_data = data data = self._data_process(data) # calculate drift score drift_score = np.zeros(len(data)) step_size = self.step for i in range(0, len(data) - 2*self.window_size, step_size): data_x = data[i: i + self.window_size] data_y = data[i + self.window_size:i + 2*self.window_size] drift_score[i + self.window_size] = self._concept_distance(data_x, data_y) threshold = _cal_threshold(drift_score, self.threshold_index) # original label label, label_location = _original_label(data, threshold, drift_score, self.window_size, step_size) # label continue label_continue = _label_continue_process(label) # find drift blocks concept_drift_location, drift_point = _drift_blocks(drift_score, label_continue, label_location) # save result _result_save(original_data, threshold, concept_drift_location, drift_point, drift_score) return drift_score, threshold, concept_drift_location
def _result_save(original_data, threshold, concept_location, drift_point, drift_score): """ To save the result. Args: original_data(numpy.ndarray): The input data. threshold(float): The concept drift threshold. concept_location(list): The concept drift locations(x-axis). drift_point(list): The precise drift point of a drift. drift_score(numpy.ndarray): The drift score of input data. """ plt.figure(figsize=(20, 8)) plt.subplot(2, 1, 1) # Plot input data and drift points plt.plot(original_data, label="data") plt.title('concept drift check, threshold=' + str(threshold), fontsize=25) plt.scatter(concept_location, np.ones(len(concept_location)), marker='*', s=200, color="b", label="concept drift occurred") for _, i in enumerate(drift_point): plt.axvline(x=i, color='r', linestyle='--') plt.legend(fontsize=15) plt.subplot(2, 1, 2) # Plot drift score plt.plot(drift_score, label="drift_score") plt.axhline(y=threshold, color='r', linestyle='--', label="threshold") plt.legend(fontsize=15) plt.savefig('concept_drift_check.pdf') def _original_label(original_data, threshold, drift_score, window_size, step_size): """ To obtain a original drift label of time series. Args: original_data(numpy.ndarray): The input data. threshold(float): The drift threshold. drift_score(numpy.ndarray): The drift score of the input data. window_size(int): Size of a concept window. Usually 3 periods of the input data if it is periodic. step_size(int): The jump length of the sliding window. Returns: - list, the drift label of input data. 0 means no drift, and 1 means drift happens. - list, the locations of drifts(x-axis). """ label = [] label_location = [] # Label: label=0, no drifts; label=1, drift happens. for i in range(0, len(original_data) - 2*window_size, step_size): label_location.append(i + window_size) if drift_score[i + window_size] >= threshold: label.append(1) else: label.append(0) return label, label_location def _label_continue_process(label): """ To obtain a continual drift label of time series. Args: label(list): The original drift label. Returns: - numpy.ndarray, The continual drift label. The drift may happen occasionally, we hope to avoid frequent alarms, so label continue process is necessary. """ if label[-1] == 1 and label[-2] == 0 and label[-3] == 0 and label[-4] == 0: label[-1] = 0 if label[0] == 1 and label[1] == 0 and label[2] == 0 and label[3] == 0: label[0] = 0 label_continue = np.array(label) # Label continue process for i in range(1, len(label) - 1): if label[i - 1] == 0 and label[i + 1] == 0: label_continue[i - 1:i + 1] = 0 return label_continue def _find_loc(label_location): return label_location[1] - label_location[0] def _continue_block(location): """ Find continue blocks of concept drift. Args: location(numpy.ndarray): The locations of concept drift. Returns: - list, continue blocks of concept drift. """ area = [] for _, loc in groupby(enumerate(location), _find_loc): l_1 = [j for i, j in loc] area.append(l_1) return area def _drift_blocks(drift_score, label_continue, label_location): """ Find the concept drift areas. Args: drift_score(numpy.ndarray): The drift score of the data series. label_continue(numpy.ndarray): The concept drift continual label. label_location(numpy.ndarray): The locations of concept drift(x-axis). Returns: - list, the concept drift locations(x-axis) after continual blocks finding. - list, return a precise beginning location of a drift. """ # Find drift blocks area = _continue_block(np.where(label_continue == 1)[0]) label_continue = np.array(label_continue) label_location = np.array(label_location) label_continue = label_continue[label_continue == 1] concept_location = [] drift_point = [] # Find drift points for _, lo_ in enumerate(area): location = label_location[lo_] concept_location.extend(location) if label_continue.size > 0: drift_point.append(location[np.where(drift_score[location] == np.max(drift_score[location]))[0]]) else: drift_point.append(None) return concept_location, drift_point def _w_generate(res_size, in_size, input_data): """ Randomly generate weights of the reservoir model. Args: res_size(int): The number of reservoir nodes. in_size(int): The input size of reservoir model. input_data(numpy.ndarray): Input data. Returns: - numpy.ndarray, the state of reservoir. """ # Weight generates randomly np.random.seed(42) w_in = np.random.rand(res_size, in_size) - 0.5 w_0 = np.random.rand(res_size, res_size) - 0.5 w_0 *= 0.8 a_speed = 0.3 # Data reshape data_channel = None if input_data.ndim == 2: data_channel = input_data.shape[1] if input_data.ndim == 1: data_channel = 1 # Reservoir state x_state = np.zeros((res_size, data_channel)) u_input = input_data.reshape(len(input_data), data_channel) for _ in range(50): x_state = (1 - a_speed)*x_state + \ a_speed*np.tanh(np.dot(w_in, u_input) + np.dot(w_0, x_state)) return x_state def _cal_distance(matrix1, matrix2): """ Calculate distance between two matrix. Args: matrix1(numpy.ndarray): Input array. matrix2(numpy.ndarray): Input array. Returns: - numpy.ndarray, distance between two arrays. """ w_mean_x = np.mean(matrix1, axis=0) w_mean_y = np.mean(matrix2, axis=0) distance = sum(abs(w_mean_x - w_mean_y)) return distance def _cal_threshold(distance, threshold_index): """ Calculate the threshold of concept drift. Args: distance(numpy.ndarray): The distance between two data series. threshold_index(float): Threshold adjusted index, [-∞, +∞]. Returns: - float, [0, 1]. """ distance = distance[distance > 0] # Threshold calculation if distance.size > 0: q_1 = np.percentile(distance, 25) q_3 = np.percentile(distance, 75) q_diff = q_3 - q_1 threshold = np.clip(0.1 + threshold_index*q_diff, 0, 1) else: threshold = 1 return threshold