Source code for mindinsight.lineagemgr.collection.model.model_lineage

# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""This module is used to collect lineage information of model training."""
import json
import os

import numpy as np

from mindinsight.lineagemgr.summary.summary_record import LineageSummary
from mindinsight.utils.exceptions import \
    MindInsightException
from mindinsight.lineagemgr.common.validator.validate import validate_train_run_context, \
    validate_eval_run_context, validate_file_path, validate_network, \
    validate_int_params, validate_summary_record, validate_raise_exception,\
    validate_user_defined_info
from mindinsight.lineagemgr.common.exceptions.error_code import LineageErrors, LineageErrorMsg
from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamRunContextError, \
    LineageGetModelFileError, LineageLogError
from mindinsight.lineagemgr.common.log import logger as log
from mindinsight.lineagemgr.common.utils import try_except, make_directory
from mindinsight.lineagemgr.common.validator.model_parameter import RunContextArgs, \
    EvalParameter
from mindinsight.lineagemgr.collection.model.base import Metadata

try:
    from mindspore.common.tensor import Tensor
    from mindspore.train.callback import Callback, RunContext, ModelCheckpoint, SummaryStep
    from mindspore.nn import Cell, Optimizer
    from mindspore.nn.loss.loss import _Loss
    from mindspore.dataset.engine import Dataset, ImageFolderDatasetV2, MnistDataset, Cifar10Dataset, Cifar100Dataset, \
        VOCDataset, CelebADataset, MindDataset, ManifestDataset, TFRecordDataset, TextFileDataset
    import mindspore.dataset as ds
except (ImportError, ModuleNotFoundError):
    log.warning('MindSpore Not Found!')


[docs]class TrainLineage(Callback): """ Collect lineage of a training job. Args: summary_record (Union[SummaryRecord, str]): The `SummaryRecord` object which is used to record the summary value(see mindspore.train.summary.SummaryRecord), or a log dir(as a `str`) to be passed to `LineageSummary` to create a lineage summary recorder. It should be noted that instead of making use of summary_record to record lineage info directly, we obtain log dir from it then create a new summary file to write lineage info. raise_exception (bool): Whether to raise exception when error occurs in TrainLineage. If True, raise exception. If False, catch exception and continue. Default: False. user_defined_info (dict): User defined information. Only flatten dict with str key and int/float/str value is supported. Default: None. Raises: MindInsightException: If validating parameter fails. LineageLogError: If recording lineage information fails. Examples: >>> from mindinsight.lineagemgr import TrainLineage >>> from mindspore.train.callback import ModelCheckpoint, SummaryStep >>> from mindspore.train.summary import SummaryRecord >>> model = Model(train_network) >>> model_ckpt = ModelCheckpoint(directory='/dir/to/save/model/') >>> summary_writer = SummaryRecord(log_dir='./') >>> summary_callback = SummaryStep(summary_writer, flush_step=2) >>> lineagemgr = TrainLineage(summary_record=summary_writer) >>> model.train(epoch_num, dataset, callbacks=[model_ckpt, summary_callback, lineagemgr]) """ def __init__(self, summary_record, raise_exception=False, user_defined_info=None): super(TrainLineage, self).__init__() try: validate_raise_exception(raise_exception) self.raise_exception = raise_exception if isinstance(summary_record, str): # make directory if not exist self.lineage_log_dir = make_directory(summary_record) else: validate_summary_record(summary_record) summary_log_path = summary_record.full_file_name validate_file_path(summary_log_path) self.lineage_log_dir = os.path.dirname(summary_log_path) self.lineage_summary = LineageSummary(self.lineage_log_dir) self.initial_learning_rate = None self.user_defined_info = user_defined_info if user_defined_info: validate_user_defined_info(user_defined_info) except MindInsightException as err: log.error(err) if raise_exception: raise
[docs] @try_except(log) def begin(self, run_context): """ Initialize the training progress when the training job begins. Args: run_context (RunContext): It contains all lineage information, see mindspore.train.callback.RunContext. Raises: MindInsightException: If validating parameter fails. """ log.info('Initialize training lineage collection...') if self.user_defined_info: self.lineage_summary.record_user_defined_info(self.user_defined_info) if not isinstance(run_context, RunContext): error_msg = f'Invalid TrainLineage run_context.' log.error(error_msg) raise LineageParamRunContextError(error_msg) run_context_args = run_context.original_args() if not self.initial_learning_rate: optimizer = run_context_args.get('optimizer') if optimizer and not isinstance(optimizer, Optimizer): log.error("The parameter optimizer is invalid. It should be an instance of " "mindspore.nn.optim.optimizer.Optimizer.") raise MindInsightException(error=LineageErrors.PARAM_OPTIMIZER_ERROR, message=LineageErrorMsg.PARAM_OPTIMIZER_ERROR.value) if optimizer: log.info('Obtaining initial learning rate...') self.initial_learning_rate = AnalyzeObject.analyze_optimizer(optimizer) log.debug('initial_learning_rate: %s', self.initial_learning_rate) else: network = run_context_args.get('train_network') validate_network(network) optimizer = AnalyzeObject.get_optimizer_by_network(network) self.initial_learning_rate = AnalyzeObject.analyze_optimizer(optimizer) log.debug('initial_learning_rate: %s', self.initial_learning_rate) # get train dataset graph train_dataset = run_context_args.get('train_dataset') dataset_graph_dict = ds.serialize(train_dataset) dataset_graph_json_str = json.dumps(dataset_graph_dict, indent=2) dataset_graph_dict = json.loads(dataset_graph_json_str) log.info('Logging dataset graph...') try: self.lineage_summary.record_dataset_graph(dataset_graph=dataset_graph_dict) except Exception as error: error_msg = f'Dataset graph log error in TrainLineage begin: {error}' log.error(error_msg) raise LineageLogError(error_msg) log.info('Dataset graph logged successfully.')
[docs] @try_except(log) def end(self, run_context): """ Collect lineage information when the training job ends. Args: run_context (RunContext): It contains all lineage information, see mindspore.train.callback.RunContext. Raises: LineageLogError: If recording lineage information fails. """ log.info('Start to collect training lineage...') if not isinstance(run_context, RunContext): error_msg = f'Invalid TrainLineage run_context.' log.error(error_msg) raise LineageParamRunContextError(error_msg) run_context_args = run_context.original_args() validate_train_run_context(RunContextArgs, run_context_args) train_lineage = dict() train_lineage = AnalyzeObject.get_network_args( run_context_args, train_lineage ) train_dataset = run_context_args.get('train_dataset') callbacks = run_context_args.get('list_callback') list_callback = getattr(callbacks, '_callbacks', []) log.info('Obtaining model files...') ckpt_file_path, _ = AnalyzeObject.get_file_path(list_callback) train_lineage[Metadata.learning_rate] = self.initial_learning_rate train_lineage[Metadata.epoch] = run_context_args.get('epoch_num') train_lineage[Metadata.step_num] = run_context_args.get('cur_step_num') train_lineage[Metadata.parallel_mode] = run_context_args.get('parallel_mode') train_lineage[Metadata.device_num] = run_context_args.get('device_number') train_lineage[Metadata.batch_size] = run_context_args.get('batch_num') model_path_dict = { 'ckpt': ckpt_file_path } train_lineage[Metadata.model_path] = json.dumps(model_path_dict) log.info('Calculating model size...') train_lineage[Metadata.model_size] = AnalyzeObject.get_model_size( ckpt_file_path ) log.debug('model_size: %s', train_lineage[Metadata.model_size]) log.info('Analyzing dataset object...') train_lineage = AnalyzeObject.analyze_dataset(train_dataset, train_lineage, 'train') log.info('Logging lineage information...') try: self.lineage_summary.record_train_lineage(train_lineage) except IOError as error: error_msg = f'End error in TrainLineage: {error}' log.error(error_msg) raise LineageLogError(error_msg) except Exception as error: error_msg = f'End error in TrainLineage: {error}' log.error(error_msg) log.error('Fail to log the lineage of the training job.') raise LineageLogError(error_msg) log.info('The lineage of the training job has logged successfully.')
[docs]class EvalLineage(Callback): """ Collect lineage of an evaluation job. Args: summary_record (Union[SummaryRecord, str]): The `SummaryRecord` object which is used to record the summary value(see mindspore.train.summary.SummaryRecord), or a log dir(as a `str`) to be passed to `LineageSummary` to create a lineage summary recorder. It should be noted that instead of making use of summary_record to record lineage info directly, we obtain log dir from it then create a new summary file to write lineage info. raise_exception (bool): Whether to raise exception when error occurs in EvalLineage. If True, raise exception. If False, catch exception and continue. Default: False. user_defined_info (dict): User defined information. Only flatten dict with str key and int/float/str value is supported. Default: None. Raises: MindInsightException: If validating parameter fails. LineageLogError: If recording lineage information fails. Examples: >>> from mindinsight.lineagemgr import EvalLineage >>> from mindspore.train.callback import ModelCheckpoint, SummaryStep >>> from mindspore.train.summary import SummaryRecord >>> model = Model(train_network) >>> model_ckpt = ModelCheckpoint(directory='/dir/to/save/model/') >>> summary_writer = SummaryRecord(log_dir='./') >>> summary_callback = SummaryStep(summary_writer, flush_step=2) >>> lineagemgr = EvalLineage(summary_record=summary_writer) >>> model.eval(epoch_num, dataset, callbacks=[model_ckpt, summary_callback, lineagemgr]) """ def __init__(self, summary_record, raise_exception=False, user_defined_info=None): super(EvalLineage, self).__init__() try: validate_raise_exception(raise_exception) self.raise_exception = raise_exception if isinstance(summary_record, str): # make directory if not exist self.lineage_log_dir = make_directory(summary_record) else: validate_summary_record(summary_record) summary_log_path = summary_record.full_file_name validate_file_path(summary_log_path) self.lineage_log_dir = os.path.dirname(summary_log_path) self.lineage_summary = LineageSummary(self.lineage_log_dir) self.user_defined_info = user_defined_info if self.user_defined_info: validate_user_defined_info(self.user_defined_info) except MindInsightException as err: log.error(err) if raise_exception: raise
[docs] @try_except(log) def end(self, run_context): """ Collect lineage information when the training job ends. Args: run_context (RunContext): It contains all lineage information, see mindspore.train.callback.RunContext. Raises: MindInsightException: If validating parameter fails. LineageLogError: If recording lineage information fails. """ if self.user_defined_info: self.lineage_summary.record_user_defined_info(self.user_defined_info) if not isinstance(run_context, RunContext): error_msg = f'Invalid EvalLineage run_context.' log.error(error_msg) raise LineageParamRunContextError(error_msg) run_context_args = run_context.original_args() validate_eval_run_context(EvalParameter, run_context_args) valid_dataset = run_context_args.get('valid_dataset') eval_lineage = dict() metrics = run_context_args.get('metrics') eval_lineage[Metadata.metrics] = json.dumps(metrics) eval_lineage[Metadata.step_num] = run_context_args.get('cur_step_num') log.info('Analyzing dataset object...') eval_lineage = AnalyzeObject.analyze_dataset(valid_dataset, eval_lineage, 'valid') log.info('Logging evaluation job lineage...') try: self.lineage_summary.record_evaluation_lineage(eval_lineage) except IOError as error: error_msg = f'End error in EvalLineage: {error}' log.error(error_msg) log.error('Fail to log the lineage of the evaluation job.') raise LineageLogError(error_msg) except Exception as error: error_msg = f'End error in EvalLineage: {error}' log.error(error_msg) log.error('Fail to log the lineage of the evaluation job.') raise LineageLogError(error_msg) log.info('The lineage of the evaluation job has logged successfully.')
class AnalyzeObject: """Analyze class object in MindSpore.""" @staticmethod def get_optimizer_by_network(network): """ Get optimizer by analyzing network. Args: network (Cell): See mindspore.nn.Cell. Returns: Optimizer, an Optimizer object. """ optimizer = None net_args = vars(network) if network else {} net_cell = net_args.get('_cells') if net_args else {} for _, value in net_cell.items(): if isinstance(value, Optimizer): optimizer = value break return optimizer @staticmethod def get_loss_fn_by_network(network): """ Get loss function by analyzing network. Args: network (Cell): See mindspore.nn.Cell. Returns: Loss_fn, a Cell object. """ loss_fn = None inner_cell_list = [] net_args = vars(network) if network else {} net_cell = net_args.get('_cells') if net_args else {} for _, value in net_cell.items(): if isinstance(value, Cell) and \ not isinstance(value, Optimizer): inner_cell_list.append(value) while inner_cell_list: inner_net_args = vars(inner_cell_list[0]) inner_net_cell = inner_net_args.get('_cells') for value in inner_net_cell.values(): if isinstance(value, _Loss): loss_fn = value break if isinstance(value, Cell): inner_cell_list.append(value) if loss_fn: break inner_cell_list.pop(0) return loss_fn @staticmethod def get_backbone_network(network): """ Get the name of backbone network. Args: network (Cell): The train network. Returns: str, the name of the backbone network. """ backbone_name = None has_network = False network_key = 'network' backbone_key = '_backbone' net_args = vars(network) if network else {} net_cell = net_args.get('_cells') if net_args else {} for key, value in net_cell.items(): if key == network_key: network = value has_network = True break if has_network: while hasattr(network, network_key): network = getattr(network, network_key) if hasattr(network, backbone_key): backbone = getattr(network, backbone_key) backbone_name = type(backbone).__name__ if backbone_name is None and network is not None: backbone_name = type(network).__name__ return backbone_name @staticmethod def analyze_optimizer(optimizer): """ Analyze Optimizer, a Cell object of MindSpore. In this way, we can obtain the following attributes: learning_rate (float), weight_decay (float), momentum (float), weights (float). Args: optimizer (Optimizer): See mindspore.nn.optim.Optimizer. Returns: float, the learning rate that the optimizer adopted. """ learning_rate = None if isinstance(optimizer, Optimizer): learning_rate = getattr(optimizer, 'learning_rate', None) if learning_rate: learning_rate = learning_rate.default_input # Get the real learning rate value if isinstance(learning_rate, Tensor): learning_rate = learning_rate.asnumpy() if learning_rate.ndim == 0: learning_rate = np.atleast_1d(learning_rate) learning_rate = list(learning_rate) elif isinstance(learning_rate, float): learning_rate = [learning_rate] return learning_rate[0] if learning_rate else None @staticmethod def analyze_dataset(dataset, lineage_dict, dataset_type): """ Analyze Dataset, a Dataset object of MindSpore. In this way, we can obtain the following attributes: dataset_path (str), train_dataset_size (int), valid_dataset_size (int), batch_size (int) Args: dataset (Dataset): See mindspore.dataengine.datasets.Dataset. lineage_dict (dict): A dict contains lineage metadata. dataset_type (str): Dataset type, train or valid. Returns: dict, the lineage metadata. """ batch_num = dataset.get_dataset_size() batch_size = dataset.get_batch_size() if batch_num is not None: validate_int_params(batch_num, 'dataset_batch_num') validate_int_params(batch_num, 'dataset_batch_size') log.debug('dataset_batch_num: %d', batch_num) log.debug('dataset_batch_size: %d', batch_size) dataset_path = AnalyzeObject.get_dataset_path_wrapped(dataset) if dataset_path and os.path.isfile(dataset_path): dataset_path, _ = os.path.split(dataset_path) dataset_size = int(batch_num * batch_size) if dataset_type == 'train': lineage_dict[Metadata.train_dataset_path] = dataset_path lineage_dict[Metadata.train_dataset_size] = dataset_size elif dataset_type == 'valid': lineage_dict[Metadata.valid_dataset_path] = dataset_path lineage_dict[Metadata.valid_dataset_size] = dataset_size return lineage_dict def get_dataset_path(self, output_dataset): """ Get dataset path of MindDataset object. Args: output_dataset (Union[Dataset, ImageFolderDatasetV2, MnistDataset, Cifar10Dataset, Cifar100Dataset, VOCDataset, CelebADataset, MindDataset, ManifestDataset, TFRecordDataset, TextFileDataset]): See mindspore.dataengine.datasets.Dataset. Returns: str, dataset path. """ dataset_dir_set = (ImageFolderDatasetV2, MnistDataset, Cifar10Dataset, Cifar100Dataset, VOCDataset, CelebADataset) dataset_file_set = (MindDataset, ManifestDataset) dataset_files_set = (TFRecordDataset, TextFileDataset) if isinstance(output_dataset, dataset_file_set): return output_dataset.dataset_file if isinstance(output_dataset, dataset_dir_set): return output_dataset.dataset_dir if isinstance(output_dataset, dataset_files_set): return output_dataset.dataset_files[0] return self.get_dataset_path(output_dataset.input[0]) @staticmethod def get_dataset_path_wrapped(dataset): """ A wrapper for obtaining dataset path. Args: dataset (Union[MindDataset, Dataset]): See mindspore.dataengine.datasets.Dataset. Returns: str, dataset path. """ dataset_path = None if isinstance(dataset, Dataset): try: dataset_path = AnalyzeObject().get_dataset_path(dataset) except IndexError: dataset_path = None dataset_path = validate_file_path(dataset_path, allow_empty=True) return dataset_path @staticmethod def get_file_path(list_callback): """ Get ckpt_file_name and summary_log_path from MindSpore callback list. Args: list_callback (list[Callback]): The MindSpore training Callback list. Returns: tuple, contains ckpt_file_name and summary_log_path. """ ckpt_file_path = None summary_log_path = None for callback in list_callback: if isinstance(callback, ModelCheckpoint): ckpt_file_path = callback.latest_ckpt_file_name if isinstance(callback, SummaryStep): summary_log_path = callback.summary_file_name if ckpt_file_path: validate_file_path(ckpt_file_path) ckpt_file_path = os.path.realpath(ckpt_file_path) if summary_log_path: validate_file_path(summary_log_path) summary_log_path = os.path.realpath(summary_log_path) return ckpt_file_path, summary_log_path @staticmethod def get_file_size(file_path): """ Get the file size. Args: file_path (str): The file path. Returns: int, the file size. """ try: return os.path.getsize(file_path) except (OSError, IOError) as error: error_msg = f"Error when get model file size: {error}" log.error(error_msg) raise LineageGetModelFileError(error_msg) @staticmethod def get_model_size(ckpt_file_path): """ Get model the total size of the model file and the checkpoint file. Args: ckpt_file_path (str): The checkpoint file path. Returns: int, the total file size. """ if ckpt_file_path: ckpt_file_path = os.path.realpath(ckpt_file_path) ckpt_file_size = AnalyzeObject.get_file_size(ckpt_file_path) else: ckpt_file_size = 0 return ckpt_file_size @staticmethod def get_network_args(run_context_args, train_lineage): """ Get the parameters related to the network, such as optimizer, loss function. Args: run_context_args (dict): It contains all information of the training job. train_lineage (dict): A dict contains lineage metadata. Returns: dict, the lineage metadata. """ network = run_context_args.get('train_network') validate_network(network) optimizer = run_context_args.get('optimizer') if not optimizer: optimizer = AnalyzeObject.get_optimizer_by_network(network) loss_fn = run_context_args.get('loss_fn') if not loss_fn: loss_fn = AnalyzeObject.get_loss_fn_by_network(network) loss = None else: loss = run_context_args.get('net_outputs') if loss: log.info('Calculating loss...') loss_numpy = loss.asnumpy() loss = float(np.atleast_1d(loss_numpy)[0]) log.debug('loss: %s', loss) train_lineage[Metadata.loss] = loss else: train_lineage[Metadata.loss] = None # Analyze classname of optimizer, loss function and training network. train_lineage[Metadata.optimizer] = type(optimizer).__name__ \ if optimizer else None train_lineage[Metadata.train_network] = AnalyzeObject.get_backbone_network(network) train_lineage[Metadata.loss_function] = type(loss_fn).__name__ \ if loss_fn else None return train_lineage