Source code for mindspore.common.api

# This is the Python adaptation and derivative work of Myia (https://github.com/mila-iqia/myia/).
#
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Providing interface methods."""
import types
from collections import OrderedDict
from functools import wraps
from mindspore import context
from mindspore import log as logger
from mindspore.parallel._utils import _get_parallel_mode
from .._c_expression import generate_key, Executor_, Tensor, MetaTensor
from .._c_expression import verify_inputs_signature, init_exec_dataset, export_graph, _set_dataset_mode_config, init_ge
from .tensor import Tensor as MsTensor

# store ms_function class compiled pipeline cache
ms_compile_cache = {}


def _convert_function_arguments(fn, *args):
    """
    Process the fn default parameters.

    Args:
        fn (Function): The function to be parsed.
        args (tuple): The parameters of the function.
    """
    arguments_dict = OrderedDict()
    parse_method = None
    if isinstance(fn, (types.FunctionType, types.MethodType)):
        parse_method = fn.__name__
        index = 0
        for value in args:
            arguments_dict[f'arg{index}'] = value
            index = index + 1
        logger.debug("fn(%r) full parameters dict is: %r", fn, arguments_dict)
        converted = True
    else:
        logger.warning("Find error: fn isn't function or method")
        converted = False
    return converted, arguments_dict, parse_method


def _wrap_func(fn):
    """
    Wrapper function, convert return data to tensor or tuple of tensor.

    Args:
        fn (Function): The function need be wrapped.

    Returns:
        Function, a new function with return suitable format data.
    """
    @wraps(fn)
    def wrapper(*arg, **kwargs):
        results = fn(*arg, **kwargs)

        def _convert_data(data):
            if isinstance(data, Tensor) and not isinstance(data, MsTensor):
                return MsTensor(data)
            return data

        if isinstance(results, tuple):
            return tuple(_convert_data(x) for x in results)
        if isinstance(results, list):
            return list(_convert_data(x) for x in results)
        return _convert_data(results)

    return wrapper


def _exec_init_graph(obj, init_phase):
    """Execute the parameter initializer graph."""
    inst_executor = Executor_.get_instance()
    exec_init_graph = False
    for param in obj.get_parameters():
        if not param.is_init:
            param.is_init = True
            exec_init_graph = True

    if exec_init_graph:
        inst_executor.run_init_graph(obj.parameters_dict(), init_phase)


class _MindSporeFunction:
    """
    Represents a function compiled by mind expression.

    _MindSporeFunction will compile the original function for every combination
    of argument types and shapes it is given (as well as their values, optionally).

    Args:
        fn (Function): The root function to compile.
        input_signature (Function): User defines signature to verify input.
        obj (Object): If function is a method, obj is the owner of function,
             else, obj is none.
    """
    def __init__(self, fn, input_signature=None, obj=None):
        self.fn = fn
        self.save_graphs = context.get_context("save_graphs")
        self.save_graphs_path = context.get_context("save_graphs_path")
        self.input_signature = input_signature
        self.obj = None
        self.identify_obj = None
        if hasattr(obj, fn.__name__):
            self.obj = obj
        elif obj is not None:
            self.identify_obj = obj
        self._executor = Executor_.get_instance()

    def build_data_init_graph(self, graph_name):
        """Build GE data graph and init graph for the given graph name."""
        if self.obj is None:
            logger.warning("Make sure parameter should not be used in function")
            para_dict = OrderedDict()
            self._executor.build_data_graph(para_dict, graph_name)
            return
        self._executor.build_data_graph(self.obj.parameters_dict(), graph_name, self.obj.parameters_broadcast_dict())
        init_phase = "init_subgraph" + graph_name[graph_name.find("."):]
        _exec_init_graph(self.obj, init_phase)

    def compile(self, arguments_dict, method_name):
        """Returns pipline for the given args."""
        args_list = tuple(arguments_dict.values())
        arg_names = tuple(arguments_dict.keys())

        # remove first self parameter when fn is a method
        if self.obj is not None:
            args_list = args_list[1:]
            arg_names = arg_names[1:]

        # verify the signature for both function and method
        if self.input_signature is not None:
            signatures = []
            for sig_spec in self.input_signature:
                if not isinstance(sig_spec, MetaTensor):
                    raise TypeError("Input_signature is not MetaTensor")
                signatures.append(sig_spec)
            is_valid_input = verify_inputs_signature(signatures, args_list)
            if not is_valid_input:
                raise ValueError("Inputs is incompatible with input signature!")

        dic = dict(zip(arg_names, args_list))
        generate_name = self.fn.__module__ + "." + self.fn.__name__
        self.fn.__parse_method__ = method_name

        # replace key with obj info and object ext info when fn is a method
        if self.obj is not None:
            self.obj.__parse_method__ = method_name
            generate_name = self.obj.__module__ + "." + str(self.obj.create_time)
        if self.identify_obj is not None:
            generate_name = generate_name + str(id(self.identify_obj))

        key = generate_key(generate_name, dic)
        phase = str(key[1]) + generate_name
        if key not in ms_compile_cache.keys():
            is_compile = False
            if self.obj is None:
                is_compile = self._executor.compile(self.fn, args_list, phase, True)
            else:
                is_compile = self._executor.compile(self.obj, args_list, phase, True)
            if not is_compile:
                raise RuntimeError("Executor compile failed.")
            if context.get_context("enable_ge"):
                self.build_data_init_graph(phase)
            # since function can be redefined, we only cache class method pipeline
            if self.obj is not None or self.identify_obj is not None:
                ms_compile_cache[key] = phase
            return phase

        return ms_compile_cache[key]

    @_wrap_func
    def __call__(self, *args):
        init_ge()
        converted, arguments_dict, parse_method = _convert_function_arguments(self.fn, *args)
        if not converted:
            raise RuntimeError('Process function parameter is failure')

        args_list = tuple(arguments_dict.values())
        if self.obj is not None:
            args_list = args_list[1:]

        phase = self.compile(arguments_dict, parse_method)

        if context.get_context("precompile_only"):
            return None
        return self._executor(args_list, phase)


[docs]def ms_function(fn=None, obj=None, input_signature=None): """ Creates a callable MindSpore graph from a python function. This allows the MindSpore runtime to apply optimizations based on graph. Args: fn (Function): The Python function that will be run as a graph. Default: None. obj (Object): The Python Object that provide information for identify compiled function. Default: None. input_signature (MetaTensor): The MetaTensor to describe the input arguments. The MetaTensor specifies the shape and dtype of the Tensor and they will be supplied to this function. If input_signature is specified, every input to `fn` must be a `Tensor`. And the input parameters of `fn` cannot accept `**kwargs`. The shape and dtype of actual inputs should keep same with input_signature, or TypeError will be raised. Default: None. Returns: Function, if `fn` is not None, returns a callable that will execute the compiled function; If `fn` is None, returns a decorator and when this decorator invokes with a single `fn` argument, the callable is equal to the case when `fn` is not None. Examples: >>> def tensor_add(x, y): >>> z = F.tensor_add(x, y) >>> return z >>> >>> @ms_function >>> def tensor_add_with_dec(x, y): >>> z = F.tensor_add(x, y) >>> return z >>> >>> @ms_function(input_signature=(MetaTensor(mstype.float32, (1, 1, 3, 3)), >>> MetaTensor(mstype.float32, (1, 1, 3, 3)))) >>> def tensor_add_with_sig(x, y): >>> z = F.tensor_add(x, y) >>> return z >>> >>> x = Tensor(np.ones([1, 1, 3, 3]).astype(np.float32)) >>> y = Tensor(np.ones([1, 1, 3, 3]).astype(np.float32)) >>> >>> tensor_add_graph = ms_function(fn=tensor_add) >>> out = tensor_add_graph(x, y) >>> out = tensor_add_with_dec(x, y) >>> out = tensor_add_with_sig(x, y) """ def wrap_mindspore(func): @wraps(func) def staging_specialize(*args): process_obj = obj if args and not isinstance(args[0], MsTensor) and hasattr(args[0], func.__name__): process_obj = args[0] args = (x.default_input if hasattr(x, 'default_input') else x for x in args) return _MindSporeFunction(func, input_signature, process_obj)(*args) return staging_specialize if fn is not None: return wrap_mindspore(fn) return wrap_mindspore
def _generate_pip_args(obj, *args, method="construct"): """Generate arguments for pipeline.""" if hasattr(obj, method): fn = getattr(obj, method) else: raise AttributeError('The process method is not exist') converted, arguments_dict, parse_method = _convert_function_arguments(fn, *args) if not converted: raise RuntimeError('Process method parameter is failure') args_list = tuple(arguments_dict.values()) args_names = tuple(arguments_dict.keys()) obj.__parse_method__ = parse_method return args_names, args_list class _Executor: """ An executor used to compile/manage/run graph. Including data_graph, train_graph, eval_graph and predict graph. Returns: Graph, return the result of pipeline running. """ def __init__(self): # create needed graph by lazy mode self.is_init = False self._executor = Executor_.get_instance() self.compile_cache = {} self.phase_prefix = "" def init_dataset(self, queue_name, dataset_size, batch_size, dataset_types, dataset_shapes, input_indexs, phase='dataset'): """ Initialization interface for calling data subgraph. Args: queue_name (str): The name of tdt queue on the device. dataset_size (int): The size of dataset. batch_size (int): The size of batch. dataset_types (list): The output types of element in dataset. dataset_shapes (list): The output shapes of element in dataset. input_indexs (list): The index of data with net. phase (str): The name of phase, e.g., train_dataset/eval_dataset. Default: 'dataset'. Returns: bool, specifies whether the data subgraph was initialized successfully. """ if not init_exec_dataset(queue_name=queue_name, size=dataset_size, batch_size=batch_size, types=dataset_types, shapes=dataset_shapes, input_indexs=input_indexs, phase=phase): raise RuntimeError("Failure to init and dataset subgraph!") return True def _build_data_graph(self, obj, params, phase): if params is None: self._executor.build_data_graph(obj.parameters_dict(), phase, obj.parameters_broadcast_dict()) elif isinstance(params, OrderedDict): self._executor.build_data_graph(params, phase) else: raise TypeError('Parameters need OrderedDict type, but got {}'. format(type(params))) def compile(self, obj, *args, phase='predict', params=None): """ Compiles graph. Args: obj (Function/Cell): The function or cell instance need compile. args (tuple): Function or cell input arguments. phase (str): The name of compile phase. Default: 'predict'. params (OrderedDict): The parameters dictionary used for init data graph. Default: None. Return: Str, the full phase of the cell. Bool, if the graph has been compiled before, return False, else return True. """ obj.check_names() args_names, args_list = _generate_pip_args(obj, *args) dic = dict(zip(args_names, args_list)) key = generate_key(phase, dic) self.phase_prefix = str(key[1]) if phase == 'export': phase = phase + '.' + str(obj.create_time) else: phase = self.phase_prefix + phase + '.' + str(obj.create_time) enable_debug_runtime = context.get_context("enable_debug_runtime") enable_ge = context.get_context("enable_ge") use_vm = not enable_ge or (enable_debug_runtime and context.get_context("mode") == context.PYNATIVE_MODE) if phase in self.compile_cache.keys(): logger.debug("%r graph has existed.", phase) return phase, False result = self._executor.compile(obj, args_list, phase, use_vm) self.compile_cache[phase] = phase if not result: raise RuntimeError("Executor compile failed.") graph = self._executor.get_func_graph(phase) if graph is None: logger.error("%r graph compile failed.", phase) if not enable_debug_runtime or enable_ge: if _get_parallel_mode() in ["auto_parallel", "semi_auto_parallel"]: obj.parameter_layout_dict = self._executor.get_parameter_layout(phase) obj.load_parameter_slice(params) if _get_parallel_mode() in ["hybrid_parallel"]: obj.parameter_layout_dict = self._build_parameter_layout(obj) # the following GE init process is not needed when use vm or ms backend if enable_ge: # decide whether to sink based on whether the inputs is virtual or not if args_list and isinstance(args_list[0], Tensor) and args_list[0].virtual_flag: _set_dataset_mode_config('graph') else: _set_dataset_mode_config('feed') self._build_data_graph(obj, params, phase) if "export" not in phase: init_phase = "init_subgraph" + "." + str(obj.create_time) _exec_init_graph(obj, init_phase) elif not enable_ge and "export" in phase: self._build_data_graph(obj, params, phase) return phase, True def _get_strategy(self, obj): real_phase = self.phase_prefix + obj.phase + '.' + str(obj.create_time) return self._executor.get_strategy(real_phase) def _get_allreduce_fusion(self, obj): real_phase = self.phase_prefix + obj.phase + '.' + str(obj.create_time) return self._executor.get_allreduce_fusion(real_phase) def has_compiled(self, phase='predict'): """ Specify whether have been compiled. Args: phase (str): The phase name. Default: 'predict'. Returns: bool, specifies whether the specific graph has been compiled. """ return self._executor.has_compiled(phase) def __call__(self, obj, *args, phase='predict'): if context.get_context("precompile_only"): return None return self.run(obj, *args, phase=phase) @_wrap_func def _exec_pip(self, obj, *args, phase=''): """Execute the generated pipeline.""" fn = obj.construct converted, arguments_dict, parse_method = _convert_function_arguments(fn, *args) if not converted: raise RuntimeError('Process method parameter is failure') args_list = tuple(arguments_dict.values()) obj.__parse_method__ = parse_method return self._executor(args_list, phase) def run(self, obj, *args, phase='predict'): """ Run the specific graph. Args: phase (str): The phase name. Default: 'predict'. Returns: Tensor/Tuple, return execute result. """ if phase == 'save': return self._executor((), phase + '.' + str(obj.create_time)) phase_real = self.phase_prefix + phase + '.' + str(obj.create_time) if self.has_compiled(phase_real): return self._exec_pip(obj, *args, phase=phase_real) raise KeyError('{} graph is not exist.'.format(phase_real)) def _build_parameter_layout(self, obj): """ Build parameter layout, for layerwise_parallel parameter. Args: obj (Function or Cell): The function or cell instance need to be compiled. Returns: Dictionary, parameter layout info. """ parameter_layout_dict = {} layerwise_parallel_parameters = [] for key in obj.parameters_dict(): if obj.parameters_dict()[key].layerwise_parallel is True: layerwise_parallel_parameters.append(key) if not layerwise_parallel_parameters: return parameter_layout_dict from ..communication.management import get_group_size group_size = [get_group_size()] for key in layerwise_parallel_parameters: tensor_map = [0] shape = obj.parameters_dict()[key].data.shape() for x in range(len(shape)): # dim 0 set 0, others set -1 if x: tensor_map.append(-1) layout = [group_size, tensor_map] parameter_layout_dict[key] = layout return parameter_layout_dict def del_net_res(self, net_id): self._executor.del_net_res(net_id) def _get_func_graph_proto(self, exec_id, ir_type="onnx_ir", use_prefix=False): """Get graph proto from pipeline.""" if use_prefix: exec_id = self.phase_prefix + exec_id if self._executor.has_compiled(exec_id) is False: return None return self._executor.get_func_graph_proto(exec_id, ir_type) def export(self, net, file_name, file_format='GEIR'): """ Export graph. Args: net (Cell): MindSpore network file_name (str): File name of model to export file_format (str): MindSpore currently support 'GEIR' and 'ONNX' format for exported model """ phase = 'export' + '.' + str(net.create_time) export_graph(file_name, file_format, phase) _executor = _Executor() __all__ = ['ms_function']