Source code for mindinsight.debugger.api.dump_analyzer

# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Debugger python API."""
import os.path
from typing import Iterable

from mindinsight.debugger.api.conditions import WatchpointHit, HitDetail, WatchpointHandle, WatchpointHitImpl
from mindinsight.debugger.api.debugger_engine import DebuggerEngine
from mindinsight.debugger.api.debugger_tensor import DebuggerTensor, DebuggerTensorImpl
from mindinsight.debugger.api.node import Node, NodeImpl, NodeUniqueId
from mindinsight.debugger.common.exceptions.exceptions import DebuggerParamValueError
from mindinsight.debugger.common.log import LOGGER as log
from mindinsight.debugger.common.utils import (
    validate_type, validate_slots, parse_param_to_iterable_obj
)
from mindinsight.debugger.dump.parser import DebuggerParser
from mindinsight.debugger.stream_cache.data_loader import DataLoader
from mindinsight.domain.graph.base import NodeType
from mindinsight.domain.graph.query import construct_filter_func


[docs]class DumpAnalyzer: """ Analyzer to inspect the dump data. .. warning:: All APIs in this class are experimental prototypes that are subject to change or deletion. Args: dump_dir (str): The path of the dump folder. mem_limit (int, optional): The memory limit for checking watchpoints in MB. Default: None, which means no limit. Optional values: from 2048 MB to 2147483647 MB. Examples: >>> from mindinsight.debugger import DumpAnalyzer >>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data") """ def __init__(self, dump_dir, mem_limit=None): self._dump_dir = os.path.realpath(dump_dir) self._mem_limit = 0 if mem_limit is None else mem_limit self._data_loader = None self._debuger_engine = None self._parser = None # the key is rank_id, the value is <tensor_feature, TensorImpl> map self._nodes = {} self._initialize() def _initialize(self): """Initialize.""" self._validate_mem_limit(self._mem_limit) self._data_loader = DataLoader(self._dump_dir) self._debugger_engine = DebuggerEngine(self._data_loader, self._mem_limit) self._parse() @staticmethod def _validate_mem_limit(mem_limit): """Validate memory limit.""" validate_type(mem_limit, 'mem_limit', int, 'int or None') # The unit is MB, from 2G to max value of int32 MB min_limit_value = 2 * 1024 max_limit_value = 2147483647 if mem_limit and mem_limit < min_limit_value or mem_limit > max_limit_value: msg = f"If mem_limit is not None, it should be set in [{min_limit_value}, {max_limit_value}]." raise DebuggerParamValueError(msg) def _parse(self): """Parse graph into nodes and tensors.""" def _add_node_impl(base_nodes, node_type, nodes): nonlocal id_to_name_map for b_node in base_nodes: new_node = NodeImpl(b_node, node_type) new_node.debugger_engine = self._debugger_engine nodes[new_node.rank][new_node.unique_id] = new_node id_to_name_map[new_node.rank][(b_node.graph_name, b_node.name)] = new_node.name def _update_node_list(node_list, dst_id, cur_node, cur_node_map): nonlocal id_to_name_map dst_node_name = id_to_name_map[cur_node.rank].get(dst_id) if not dst_node_name: log.info("Failed to find %s in id_to_name_map", dst_id) return unique_id = NodeUniqueId(name=dst_node_name, rank=cur_node.rank, graph_name=cur_node.graph_name) target_node = cur_node_map.get(unique_id) if not target_node: log.error("Failed to find %s in node_map", unique_id) return node_list.append(target_node) self._parser = DebuggerParser(self._data_loader) ranks = self.get_ranks() # the key is rank_id, the value is <node_unique_id, NodeImpl> map self._nodes = {rank_id: {} for rank_id in ranks} id_to_name_map = {rank_id: {} for rank_id in ranks} _add_node_impl(self._parser.constants, NodeType.CONSTANT, self._nodes) _add_node_impl(self._parser.parameters, NodeType.PARAMETER, self._nodes) _add_node_impl(self._parser.operators, NodeType.OPERATOR, self._nodes) # update input and output nodes for node_map in self._nodes.values(): for node in node_map.values(): base_node = node.base_node if hasattr(base_node, 'inputs'): # parameter or const node has no inputs for node_input in base_node.inputs: _update_node_list(node.input_nodes, (base_node.graph_name, node_input.name), node, node_map) for node_map in self._nodes.values(): for node in node_map.values(): for node_input in node.input_nodes: node_input.downstream.append(node)
[docs] def export_graphs(self, output_dir=None): """ Export the computational graph(s) in xlsx file(s) to the output_dir. The file(s) will contain the stack info of graph nodes. Args: output_dir (str, optional): Output directory to save the file. Default: None, which means to use the current working directory. Returns: str, The path of the generated file. Examples: >>> from mindinsight.debugger import DumpAnalyzer >>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data") >>> res = my_run.export_graphs() """ return self._parser.export_xlsx(output_dir)
[docs] def select_nodes( self, query_string, use_regex=False, select_by="node_name", ranks=None, case_sensitive=True) -> Iterable[Node]: """ Select nodes. Select the matched nodes in the computational graph according to the query_string. The nodes can be matched by "node_name" or "code_stack", see the args document for detail. Args: query_string (str): Query string. For a node to be selected, the match target field must contains or matches the query string. use_regex (bool): Indicates whether query is a regex. Default: False. select_by (str, optional): The field to search when selecting nodes. Available values are "node_name", "code_stack". "node_name" means to search the name of the nodes in the graph. "code_stack" means the stack info of the node. Default: "node_name". ranks (Union[int, list[int], None], optional): The ranks to select. The selected nodes must exist on the specified ranks. Default: None, which means all ranks will be considered. case_sensitive (bool, optional): Whether case-sensitive when selecting tensors. Default: True. Returns: Iterable[Node], the matched nodes. Examples: >>> from mindinsight.debugger import DumpAnalyzer >>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data") >>> nodes = my_run.select_nodes("Conv2D-op13") """ validate_type(query_string, 'query_string', str, 'str') validate_type(use_regex, 'use_regex', bool, 'bool') validate_type(case_sensitive, 'case_sensitive', bool, 'bool') node_filter_func = self._get_filter_func(select_by) ranks = self._get_iterable_ranks(ranks) query_filter_func = construct_filter_func(query_string, case_sensitive, use_regex) nodes = [] for rank_id in ranks: for node in self._nodes.get(rank_id, {}).values(): if node_filter_func(node, query_filter_func): nodes.append(node.copy()) return nodes
@staticmethod def _match_name(node, filter_func): """Check if name matched.""" return filter_func(node.name) @staticmethod def _match_stack(node, filter_func): """Check if stack matched.""" for res in map(filter_func, node.stack): if res: return True return False @staticmethod def _get_filter_func(select_by): """Get filter function.""" if select_by == 'node_name': return DumpAnalyzer._match_name if select_by == 'code_stack': return DumpAnalyzer._match_stack raise DebuggerParamValueError( "The param `select_by` only support `node_name` or `code_stack`.")
[docs] def select_tensors( self, query_string, use_regex=False, select_by="node_name", iterations=None, ranks=None, slots=None, case_sensitive=True) -> Iterable[DebuggerTensor]: """ Select tensors. Select the matched tensors in the directory according to the query_string. The tensors can be matched by "node_name" or "code_stack", see the args document for detail. Args: query_string (str): Query string. For a tensor to be selected, the match target field must contains or matches the query string. use_regex (bool): Indicates whether query is a regex. Default: False. select_by (str, optional): The field to search when selecting tensors. Available values are "node_name", "code_stack". "node_name" means to search the node name of the tensors in the graph. "code_stack" means the stack info of the node that outputs this tensor. Default: "node_name". iterations (Union[int, list[int], None], optional): The iterations to select. Default: None, which means all dumped iterations will be selected. ranks (Union[int, list[int], None], optional): The ranks to select. Default: None, which means all ranks will be selected. slots (list[int], optional): The slot of the selected tensor. Default: None, which means all slots will be selected. case_sensitive (bool, optional): Whether case-sensitive when selecting tensors. Default: True. Returns: Iterable[DebuggerTensor], the matched tensors. Examples: >>> from mindinsight.debugger import DumpAnalyzer >>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data") >>> tensors = my_run.select_tensors("Conv2D-op13") """ validate_type(query_string, 'query_string', str, 'str') validate_type(use_regex, 'use_regex', bool, 'bool') validate_type(case_sensitive, 'case_sensitive', bool, 'bool') validate_slots(slots) node_filter_func = self._get_filter_func(select_by) ranks = self._get_iterable_ranks(ranks) dumped_iterations = self.get_iterations(ranks) iterations = parse_param_to_iterable_obj(iterations, 'iterations', dumped_iterations) tensors = [] query_filter_func = construct_filter_func(query_string, case_sensitive, use_regex) for rank_id in ranks: for node in self._nodes.get(rank_id, {}).values(): if node_filter_func(node, query_filter_func): tensors.extend(node.get_output_tensors(slots=slots, iterations=iterations)) return tensors
[docs] def get_iterations(self, ranks=None) -> Iterable[int]: """ Get available iterations which have data dumped in this run. Args: ranks (Union[int, list[int], None], optional): The ranks to select. Get available iterations which are under the specified ranks. If None, return iterations of all ranks. Default: None. Returns: Iterable[int], sorted dumped iteration list. Examples: >>> from mindinsight.debugger import DumpAnalyzer >>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data") >>> iterations = my_run.get_iterations() >>> print(list(iterations)) [0] """ total_dumped_steps = self._data_loader.load_dumped_step() ranks = self._get_iterable_ranks(ranks) iterations = set() for rank_id in ranks: for iters_per_graph in total_dumped_steps.get(rank_id, {}).values(): iterations.update(iters_per_graph) res = list(iterations) res.sort() return res
[docs] def get_ranks(self) -> Iterable[int]: """ Get the available ranks in this run. Returns: Iterable[int], the list of rank id in current dump directory. Examples: >>> from mindinsight.debugger import DumpAnalyzer >>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data") >>> ranks = my_run.get_ranks() >>> print(list(ranks)) [0] """ return [rank_dir.rank_id for rank_dir in self._data_loader.rank_dirs]
[docs] def check_watchpoints( self, watchpoints, error_on_no_value=False) -> Iterable[WatchpointHit]: """ Check the given watch points on specified nodes(if available) on the given iterations(if available) in a batch. Note: 1. For speed, all watchpoints for the iteration should be given at the same time to avoid reading tensors len(watchpoints) times. 2. The check_watchpoints function start a new process needs to be called through the main entry Args: watchpoints (Iterable[Watchpoint]): The list of watchpoints. error_on_no_value (bool): Whether report error code in watchpoint hit when the specified tensor have no value stored in summary_dir. Default: False. Returns: Iterable[WatchpointHit], the watchpoint hist list is carefully sorted so that the user can see the most import hit on the top of the list. When there are many many watchpoint hits, we will display the list in a designed clear way. Examples: >>> from mindinsight.debugger import DumpAnalyzer >>> from mindinsight.debugger import (TensorTooLargeCondition, ... Watchpoint) >>> >>> def test_watchpoints(): ... my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data") ... tensors = my_run.select_tensors( ... query_string="Conv2D-op13", ... use_regex=True, ... iterations=[0], ... ranks=[0], ... slots=[0] ... ) ... watchpoint = Watchpoint(tensors=tensors, ... condition=TensorTooLargeCondition(abs_mean_gt=0.0)) ... # the check_watchpoints function start a new process needs to be called through the main entry ... hit = list(my_run.check_watchpoints(watchpoints=[watchpoint]))[0] ... # print(str(hit)) ... # the print result is as follows ... # Watchpoint TensorTooLarge triggered on tensor: ... # rank: 0 ... # graph_name: kernel_graph_0 ... # node_name: Default/network-WithLossCell/_backbone-AlexNet/conv2-Conv2d/Conv2D-op13 ... # slot: 0 ... # iteration: 0 ... # Threshold: {'abs_mean_gt': 0.0} ... # Hit detail: the setting for watchpoint is abs_mean_gt = 0.0. ... # The actual value of the tensor is abs_mean_gt = 0.06592023578438996. ... >>> if __name__ == "__main__": ... test_watchpoints() ... """ wp_hit_list = [] # key is watchpoint_id, value is a dict with iteration as the key and check_nodes as values iterations = set() wp_handles = {wp_id: WatchpointHandle(wp_id, wp) for wp_id, wp in enumerate(watchpoints)} for wp_handle in wp_handles.values(): iterations.update(wp_handle.get_iterations()) debugger_backend = self._debugger_engine.dbg_service # check all the watchpoint for the iterations for iteration in iterations: log.info("Check watchpoints for iteration %s", iteration) if not self._data_loader.has_data(iteration): log.info("No data dumped with iteration id: %s. Ignore checking watchpoint.", iteration) continue # adding the watchpoint for current iteration for wp_handle in wp_handles.values(): wp_handle.add_watchpoint(iteration, self._debugger_engine) # check the watchpoint for current iteration # add the hit watchpoints to hit list hit_list = debugger_backend.check_watchpoints( iteration=iteration, error_on_no_value=error_on_no_value) for hit in hit_list: # the list of slots for the hit node to report # (for the current watchpoint and iteration) wp_handle = wp_handles.get(hit.watchpoint_id) graph_name, node_name = hit.name.split('/', 1) node = self._get_node(node_name, hit.rank_id, graph_name) tensor = DebuggerTensorImpl(node=node, slot=hit.slot, iteration=iteration) if wp_handle.need_check(tensor): hit_params = hit.parameters hit_detail = HitDetail(hit_params, wp_handle.condition) wp_hit = WatchpointHitImpl(tensor=tensor, condition=wp_handle.condition, hit_detail=hit_detail, error_code=hit.error_code) wp_hit_list.append(wp_hit) if error_on_no_value: no_value_hit_list = [] for wp_handle in wp_handles.values(): no_value_hit_list += wp_handle.watchpoint_hit_on_no_value(iteration) wp_hit_list += no_value_hit_list # remove all the watchpoints for the previous iterations for watchpoint_id in wp_handles: debugger_backend.remove_watchpoint(watchpoint_id=watchpoint_id) return wp_hit_list
def _get_node(self, node_name, rank_id, graph_name): """Get NodeImpl object.""" unique_id = NodeUniqueId(name=node_name, rank=rank_id, graph_name=graph_name) node = self._nodes.get(rank_id, {}).get(unique_id) return node
[docs] def list_affected_nodes(self, tensor): """ List the nodes that use given tensor as input. Affected nodes is defined as the nodes use the given tensor as input. If a node is affected by the given tensor, the node's output value is likely to change when the given tensor changes. Args: tensor (DebuggerTensor): The tensor of which affected nodes will be returned. Returns: Iterable[Node], the affected nodes of the given tensor. Examples: >>> from mindinsight.debugger import DumpAnalyzer >>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data") >>> tensor_list = list(my_run.select_tensors(query_string="Conv2D-op13")) >>> affected_nodes = my_run.list_affected_nodes(tensor_list[0]) """ self._validate_node(tensor.node) affected_nodes = [affected_node.copy() for affected_node in tensor.node.downstream] return affected_nodes
[docs] def get_input_nodes(self, node): """ Get the input nodes of the given node. Args: node (Node): The node of which input nodes will be returned. Returns: Iterable[Node], the input nodes of the given node. Examples: >>> from mindinsight.debugger import DumpAnalyzer >>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data") >>> node_list = list(my_run.select_nodes(query_string="Conv2D-op13")) >>> input_nodes = my_run.get_input_nodes(node_list[0]) """ self._validate_node(node) input_nodes = node.input_nodes.copy() return input_nodes
[docs] def get_output_nodes(self, node): """ Get the nodes that use the output tensors of the given node. Args: node (Node): The node of which output nodes will be returned. Returns: Iterable[Node], the output nodes of this node. Examples: >>> from mindinsight.debugger import DumpAnalyzer >>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data") >>> node_list = list(my_run.select_nodes(query_string="Conv2D-op13")) >>> out_nodes = my_run.get_output_nodes(node_list[0]) """ self._validate_node(node) output_nodes = node.downstream.copy() return output_nodes
def _validate_node(self, node): """Check if node is in current directory.""" validate_type(node, 'node', NodeImpl, 'Node') node = self._get_node(node.name, node.rank, node.graph_name) if node is None: raise DebuggerParamValueError(f"Failed to find node {node.name} with rank {node.rank} " "in dump directory.") def _get_iterable_ranks(self, ranks): """ Validate input ranks and return iterable rands. Args: ranks (Union[int, list[int], None], optional): The range of ranks. Returns: list[int], list of rank id. """ total_ranks = self.get_ranks() return parse_param_to_iterable_obj(ranks, 'ranks', total_ranks)