Source code for azure.ai.ml.entities._inputs_outputs

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
"""This file includes the type classes which could be used in dsl.pipeline, command function, or any other place that requires job inputs/outputs

.. remarks::

    The following pseudo-code shows how to create a pipeline with such classes.

    .. code-block:: python

        @pipeline()
        def some_pipeline(
            input_param: Input(type="uri_folder", path="xxx", mode="ro_mount"),
            int_param0: Input(type="integer", default=0, min=-3, max=10),
            int_param1 = 2
            str_param = 'abc',
            output_param: Output(type="uri_folder", path="xxx", mode="rw_mount"),
        ):
            pass


    The following pseudo-code shows how to create a command with such classes.

    .. code-block:: python

        my_command = command(
            name="my_command",
            display_name="my_command",
            description="This is a command",
            tags=dict(),
            command="python train.py --input-data ${{inputs.input_data}} --lr ${{inputs.learning_rate}}",
            code="./src",
            compute="cpu-cluster",
            environment="my-env:1",
            distribution=MpiDistribution(process_count_per_instance=4),
            environment_variables=dict(foo="bar"),
            # Customers can still do this:
            # resources=Resources(instance_count=2, instance_type="STANDARD_D2"),
            # limits=Limits(timeout=300),
            inputs={
                "float": Input(type="number", default=1.1, min=0, max=5),
                "integer": Input(type="integer", default=2, min=-1, max=4),
                "integer1": 2,
                "string0": Input(type="string", default="default_str0"),
                "string1": "default_str1",
                "boolean": Input(type="boolean", default=False),
                "uri_folder": Input(type="uri_folder", path="https://my-blob/path/to/data", mode="ro_mount"),
                "uri_file": Input(type="uri_file", path="https://my-blob/path/to/data", mode="download"),
            },
            outputs={"my_model": Output(type="mlflow_model")},
        )
        node = my_command()

"""
import math
from typing import overload
from collections import OrderedDict

from typing import Union, Sequence, Iterable
from enum import EnumMeta, Enum as PyEnum
from inspect import Parameter, signature

from azure.ai.ml.entities._job.pipeline._exceptions import UserErrorException, MldesignerComponentDefiningError
from azure.ai.ml.entities._component.input_output import ComponentInput, ComponentOutput
from azure.ai.ml.constants import InputOutputModes, AssetTypes
from azure.ai.ml._ml_exceptions import ValidationException, ErrorTarget, ErrorCategory, ComponentException
from azure.ai.ml.entities._mixins import DictMixin


[docs]class Input(DictMixin): """Define an input of a Component or Job. Default to be a uri_folder Input. :param type: The type of the data input. Possible values include: 'uri_folder', 'uri_file', 'mltable', 'mlflow_model', 'custom_model', 'integer', 'number', 'string', 'boolean' :type type: str :param path: The path to which the input is pointing. Could be pointing to local data, cloud data, a registered name, etc. :type path: str :param mode: The mode of the data input. Possible values are: 'ro_mount': Read-only mount the data, 'download': Download the data to the compute target, 'direct': Pass in the URI as a string :type mode: str :param default: The default value of this input. When a `default` is set, the input will be optional :type default: Union[str, integer, float, bool] :param min: The min value -- if a smaller value is passed to a job, the job execution will fail :type min: Union[integer, float] :param max: The max value -- if a larger value is passed to a job, the job execution will fail :type max: Union[integer, float] :param optional: Determine if this input is optional :type optional: bool :param description: Description of the input :type description: str """ # For validation, indicates specific parameters combination for each type _TYPE_COMBINATION_MAPPING = { "uri_folder": ["path", "mode"], "uri_file": ["path", "mode"], "mltable": ["path", "mode"], "mlflow_model": ["path", "mode"], "custom_model": ["path", "mode"], "integer": ["default", "min", "max"], "number": ["default", "min", "max"], "string": ["default"], "boolean": ["default"], } _ALLOWED_TYPES = { "integer": (int), "string": (str), "number": (float), "boolean": (bool), } _DATA_TYPE_MAPPING = {int: "integer", str: "string", float: "number", bool: "boolean"} _EMPTY = Parameter.empty @overload def __init__( self, *, type: str = "uri_folder", path: str = None, mode: str = "ro_mount", optional: bool = None, description: str = None, **kwargs, ): """Initialize an input. :param type: The type of the data input. Possible values include: 'uri_folder', 'uri_file', 'mltable', 'mlflow_model', 'custom_model', and user-defined types. :type type: str :param path: The path to which the input is pointing. Could be pointing to local data, cloud data, a registered name, etc. :type path: str :param mode: The mode of the data input. Possible values are: 'ro_mount': Read-only mount the data, 'download': Download the data to the compute target, 'direct': Pass in the URI as a string :type mode: str :param optional: Determine if this input is optional :type optional: bool :param description: Description of the input :type description: str """ pass @overload def __init__( self, *, type: str = "number", default: float = None, min: float = None, max: float = None, optional: bool = None, description: str = None, **kwargs, ): """Initialize a number input. :param type: The type of the data input. Can only be set to "number". :type type: str :param default: The default value of this input. When a `default` is set, input will be optional :type default: float :param min: The min value -- if a smaller value is passed to a job, the job execution will fail :type min: float :param max: The max value -- if a larger value is passed to a job, the job execution will fail :type max: float :param optional: Determine if this input is optional :type optional: bool :param description: Description of the input :type description: str """ pass @overload def __init__( self, *, type: str = "integer", default: int = None, min: int = None, max: int = None, optional: bool = None, description: str = None, **kwargs, ): """Initialize an integer input. :param type: The type of the data input. Can only be set to "integer". :type type: str :param default: The default value of this input. When a `default` is set, the input will be optional :type default: integer :param min: The min value -- if a smaller value is passed to a job, the job execution will fail :type min: integer :param max: The max value -- if a larger value is passed to a job, the job execution will fail :type max: integer :param optional: Determine if this input is optional :type optional: bool :param description: Description of the input :type description: str """ pass @overload def __init__( self, *, type: str = "string", default: str = None, optional: bool = None, description: str = None, **kwargs, ): """Initialize a string input. :param type: The type of the data input. Can only be set to "string". :type type: str :param default: The default value of this input. When a `default` is set, the input will be optional :type default: str :param optional: Determine if this input is optional :type optional: bool :param description: Description of the input :type description: str """ pass @overload def __init__( self, *, type: str = "boolean", default: bool = None, optional: bool = None, description: str = None, **kwargs, ): """Initialize a bool input. :param type: The type of the data input. Can only be set to "boolean". :type type: str :param default: The default value of this input. When a `default` is set, input will be optional :type default: bool :param optional: Determine if this input is optional :type optional: bool :param description: Description of the input :type description: str """ pass def __init__( self, *, type: str = "uri_folder", path: str = None, mode: str = "ro_mount", default: Union[str, int, float, bool] = None, optional: bool = None, min: Union[int, float] = None, max: Union[int, float] = None, enum=None, description: str = None, **kwargs, ): # As an annotation, it is not allowed to initialize the name. # The name will be updated by the annotated variable name. self.name = None self.type = type self.description = description self._is_parameter_type = self.type in self._ALLOWED_TYPES if path and not isinstance(path, str): # this logic will make dsl data binding expression working in the same way as yaml # it's written to handle InputOutputBase, but there will be loop import if we import InputOutputBase here self.path = str(path) else: self.path = path self.mode = None if self._is_parameter_type else mode self.default = default self.optional = True if optional is True else None self.min = min self.max = max self.enum = enum self._allowed_types = self._ALLOWED_TYPES.get(self.type) self._validate_parameter_combinations() def _to_dict(self, remove_name=True): """Convert the Input object to a dict.""" keys = ["name", "path", "type", "mode", "description", "default", "min", "max", "enum", "optional"] if remove_name: keys.remove("name") result = {key: getattr(self, key) for key in keys} return _remove_empty_values(result) def _to_component_input(self): data = self._to_dict() return ComponentInput(data) def _parse(self, val): """Parse value passed from command line. :param val: The input value :return: The parsed value. """ if self.type == "integer": return int(val) elif self.type == "number": return float(val) elif self.type == "boolean": lower_val = str(val).lower() if lower_val not in {"true", "false"}: msg = "Boolean parameter '{}' only accept True/False, got {}." raise ValidationException( message=msg.format(self.name, val), no_personal_data_message=msg.format("[self.name]", "[val]"), error_category=ErrorCategory.USER_ERROR, target=ErrorTarget.PIPELINE, ) return True if lower_val == "true" else False return val def _parse_and_validate(self, val): """Parse the val passed from the command line and validate the value. :param str_val: The input string value from the command line. :return: The parsed value, an exception will be raised if the value is invalid. """ if self._is_parameter_type: val = self._parse(val) if isinstance(val, str) else val self._validate_or_throw(val) return val def _update_name(self, name): self.name = name def _update_default(self, default_value): """Update provided default values.""" if self.type == "uri_folder" or self.type == "uri_file": self.default = default_value return else: if isinstance(default_value, float) and not math.isfinite(default_value): # Since nan/inf cannot be stored in the backend, just ignore them. # logger.warning("Float default value %r is not allowed, ignored." % default_value) return """Update provided default values. Here we need to make sure the type of default value is allowed or it could be parsed.. """ if default_value is not None and not isinstance(default_value, self._allowed_types): try: default_value = self._parse(default_value) except Exception as e: if self.name is None: msg = "Default value of %s Input cannot be parsed, got '%s', type = %s." % ( self.type, default_value, type(default_value), ) else: msg = "Default value of %s Input '%s' cannot be parsed, got '%s', type = %s." % ( self.type, self.name, default_value, type(default_value), ) raise MldesignerComponentDefiningError(cause=msg) from e self.default = default_value def _validate_or_throw(self, value): """Validate input parameter value, throw exception if not as expected. It will throw exception if validate failed, otherwise do nothing. """ if not self.optional and value is None: msg = "Parameter {} cannot be None since it is not optional." raise ValidationException( message=msg.format(self.name), no_personal_data_message=msg.format("[self.name]"), error_category=ErrorCategory.USER_ERROR, target=ErrorTarget.PIPELINE, ) if self._allowed_types and value is not None: if not isinstance(value, self._allowed_types): msg = "Unexpected data type for parameter '{}'. Expected {} but got {}." raise ValidationException( message=msg.format(self.name, self._allowed_types, type(value)), no_personal_data_message=msg.format("[name]", self._allowed_types, type(value)), error_category=ErrorCategory.USER_ERROR, target=ErrorTarget.PIPELINE, ) # for numeric values, need extra check for min max value if self.type in ("integer", "number"): if self.min is not None and value < self.min: msg = "Parameter '{}' should not be less than {}." raise ValidationException( message=msg.format(self.name, self.min), no_personal_data_message=msg.format("[name]", self.min), error_category=ErrorCategory.USER_ERROR, target=ErrorTarget.PIPELINE, ) if self.max is not None and value > self.max: msg = "Parameter '{}' should not be greater than {}." raise ValidationException( message=msg.format(self.name, self.max), no_personal_data_message=msg.format("[name]", self.max), error_category=ErrorCategory.USER_ERROR, target=ErrorTarget.PIPELINE, ) def _validate_parameter_combinations(self): """Validate different parameter combinations according to type""" parameters = ["type", "path", "mode", "default", "min", "max"] parameters = {key: getattr(self, key, None) for key in parameters} type = parameters.pop("type") # validate parameter combination if type in self._TYPE_COMBINATION_MAPPING: valid_parameters = self._TYPE_COMBINATION_MAPPING[type] for key, value in parameters.items(): if key not in valid_parameters and value is not None: msg = "Invalid parameter for '{}' Input, parameter '{}' should be None but got '{}'" raise ValidationException( message=msg.format(type, key, value), no_personal_data_message=msg.format("[type]", "[parameter]", "[parameter_value]"), error_category=ErrorCategory.USER_ERROR, target=ErrorTarget.PIPELINE, ) @classmethod def _get_input_by_type(cls, t: type): if t in cls._DATA_TYPE_MAPPING: return cls(type=cls._DATA_TYPE_MAPPING[t]) return None @classmethod def _get_default_string_input(cls): return cls(type="string") @classmethod def _get_param_with_standard_annotation(cls, func): return _get_param_with_standard_annotation(func, is_func=True)
[docs]class Output(DictMixin): """Define an output of a Component or Job. :param type: The type of the data output. Possible values include: 'uri_folder', 'uri_file', 'mltable', 'mlflow_model', 'custom_model', and user-defined types. :type type: str :param path: The path to which the output is pointing. Needs to point to a cloud path. :type path: str :param mode: The mode of the data output. Possible values are: 'rw_mount': Read-write mount the data, 'upload': Upload the data from the compute target, 'direct': Pass in the URI as a string :type mode: str :param description: Description of the output :type description: str """ @overload def __init__(self, type="uri_folder", path=None, mode="rw_mount", description=None): """Define a uri_folder output. :param type: The type of the data output. Possible values include: 'uri_folder', 'uri_file', 'mltable', 'mlflow_model', 'custom_model', and user-defined types. :type type: str :param path: The path to which the output is pointing. Needs to point to a cloud path. :type path: str :param mode: The mode of the data output. Possible values are: 'rw_mount': Read-write mount the data, 'upload': Upload the data from the compute target, 'direct': Pass in the URI as a string :type mode: str :param description: Description of the output :type description: str """ pass @overload def __init__(self, type="uri_file", path=None, mode="rw_mount", description=None): """Define a uri_file output. :param type: The type of the data output. Possible values include: 'uri_folder', 'uri_file', 'mltable', 'mlflow_model', 'custom_model', and user-defined types. :type type: str :param path: The path to which the output is pointing. Needs to point to a cloud path. :type path: str :param mode: The mode of the data output. Possible values are: 'rw_mount': Read-write mount the data, 'upload': Upload the data from the compute target, 'direct': Pass in the URI as a string :type mode: str :param description: Description of the output :type description: str """ pass def __init__( self, *, type=AssetTypes.URI_FOLDER, path=None, mode=InputOutputModes.RW_MOUNT, description=None, **kwargs ): # As an annotation, it is not allowed to initialize the name. # The name will be updated by the annotated variable name. self.name = None self.type = type self.description = description self.path = path self.mode = mode def _get_hint(self, new_line_style=False): comment_str = self.description.replace('"', '\\"') if self.description else self.type return '"""%s"""' % comment_str if comment_str and new_line_style else comment_str def _to_dict(self, remove_name=True): """Convert the Output object to a dict.""" keys = ["name", "path", "type", "mode", "description"] if remove_name: keys.remove("name") result = {key: getattr(self, key) for key in keys} return _remove_empty_values(result) def _to_component_output(self): return ComponentOutput(self._to_dict())
class EnumInput(Input): """Enum parameter parse the value according to its enum values.""" def __init__(self, *, enum: Union[EnumMeta, Sequence[str]] = None, default=None, description=None, **kwargs): """Initialize an enum parameter, the options of an enum parameter are the enum values. :param enum: Enum values. :type Union[EnumMeta, Sequence[str]] :param description: Description of the param. :type description: str :param optional: If the param is optional. :type optional: bool """ enum_values = self._assert_enum_valid(enum) # This is used to parse enum class instead of enum str value if a enum class is provided. if isinstance(enum, EnumMeta): self._enum_class = enum self._str2enum = {v: e for v, e in zip(enum_values, enum)} else: self._enum_class = None self._str2enum = {v: v for v in enum_values} super().__init__(type="string", default=default, enum=enum_values, description=description) self._allowed_types = ( (str,) if not self._enum_class else ( self._enum_class, str, ) ) @classmethod def _assert_enum_valid(cls, enum): """Check whether the enum is valid and return the values of the enum.""" if isinstance(enum, EnumMeta): enum_values = [str(option.value) for option in enum] elif isinstance(enum, Iterable): enum_values = list(enum) else: msg = "enum must be a subclass of Enum or an iterable." raise ValidationException( message=msg, no_personal_data_message=msg, error_category=ErrorCategory.USER_ERROR, target=ErrorTarget.PIPELINE, ) if len(enum_values) <= 0: msg = "enum must have enum values." raise ValidationException( message=msg, no_personal_data_message=msg, error_category=ErrorCategory.USER_ERROR, target=ErrorTarget.PIPELINE, ) if any(not isinstance(v, str) for v in enum_values): msg = "enum values must be str type." raise ValidationException( message=msg, no_personal_data_message=msg, error_category=ErrorCategory.USER_ERROR, target=ErrorTarget.PIPELINE, ) return enum_values def _parse(self, str_val: str): """Parse the enum value from a string value or the enum value.""" if str_val is None: return str_val if self._enum_class and isinstance(str_val, self._enum_class): return str_val # Directly return the enum value if it is the enum. if str_val not in self._str2enum: msg = "Not a valid enum value: '{}', valid values: {}" raise ValidationException( message=msg.format(str_val, ", ".join(self.enum)), no_personal_data_message=msg.format("[val]", "[enum]"), error_category=ErrorCategory.USER_ERROR, target=ErrorTarget.PIPELINE, ) return self._str2enum[str_val] def _update_default(self, default_value): """Enum parameter support updating values with a string value.""" enum_val = self._parse(default_value) if self._enum_class and isinstance(enum_val, self._enum_class): enum_val = enum_val.value self.default = enum_val def _get_annotation_by_value(val): def _is_dataset(data): from azure.ai.ml.entities._job.job_io_mixin import JobIOMixin DATASET_TYPES = JobIOMixin return isinstance(data, DATASET_TYPES) if _is_dataset(val): annotation = Input elif val is Parameter.empty or val is None: # If no default value or default is None, create val as the basic parameter type, # it could be replaced using component parameter definition. annotation = Input._get_default_string_input() elif isinstance(val, PyEnum): # Handle enum values annotation = EnumInput(enum=val.__class__) else: annotation = _get_annotation_cls_by_type(type(val), raise_error=False) if not annotation: # Fall back to default annotation = Input._get_default_string_input() return annotation def _get_annotation_cls_by_type(t: type, raise_error=False): cls = Input._get_input_by_type(t) if cls is None and raise_error: raise UserErrorException(f"Can't convert type {t} to azure.ai.ml.Input") return cls def _get_param_with_standard_annotation( cls_or_func, is_func=False, non_pipeline_parameter_names=None, dynamic_param_name=None, dynamic_param_value=None ): """Standardize function parameters or class fields with dsl.types annotation.""" non_pipeline_parameter_names = non_pipeline_parameter_names or [] def _get_fields(annotations): """Return field names to annotations mapping in class.""" annotation_fields = OrderedDict() for name, annotation in annotations.items(): # Skip return type if name == "return": continue # Handle EnumMeta annotation if isinstance(annotation, EnumMeta): annotation = EnumInput(type="string", enum=annotation) # Try create annotation by type when got like 'param: int' if not _is_dsl_type_cls(annotation) and not _is_dsl_types(annotation): annotation = _get_annotation_cls_by_type(annotation, raise_error=False) if not annotation: # Fall back to string parameter annotation = Input._get_default_string_input() annotation_fields[name] = annotation return annotation_fields def _merge_field_keys(annotation_fields, defaults_dict): """Merge field keys from annotations and cls dict to get all fields in class.""" anno_keys = list(annotation_fields.keys()) dict_keys = defaults_dict.keys() if not dict_keys: return anno_keys # Fields with default values must follow those without defaults, so find the first key with # annotation that appear in the class dict, the previous keys must be in the front of the key list all_keys = [] # Use this flag to guarantee all fields with defaults following fields without defaults. seen_default = False for key in anno_keys: if key in dict_keys: seen_default = True else: if seen_default: raise UserErrorException(f"Non-default argument {key!r} follows default argument.") all_keys.append(key) # Append all keys in dict all_keys.extend(dict_keys) return all_keys def _update_annotation_with_default(anno, name, default): """Create annotation if is type class and update the default.""" # Create instance if is type class complete_annotation = anno if _is_dsl_type_cls(anno): complete_annotation = anno() complete_annotation.name = name if default is Input._EMPTY: return complete_annotation if isinstance(complete_annotation, Input): # Non-parameter Input has no default attribute if complete_annotation._is_parameter_type and complete_annotation.default is not None: # logger.warning( # f"Warning: Default value of f{complete_annotation.name!r} is set twice: " # f"{complete_annotation.default!r} and {default!r}, will use {default!r}" # ) pass complete_annotation._update_default(default) return complete_annotation def _update_fields_with_default(annotation_fields, defaults_dict): """Use public values in class dict to update annotations.""" all_fields = OrderedDict() all_filed_keys = _merge_field_keys(annotation_fields, defaults_dict) for name in all_filed_keys: # Get or create annotation annotation = ( annotation_fields[name] if name in annotation_fields else _get_annotation_by_value(defaults_dict.get(name, Input._EMPTY)) ) # Create annotation if is class type and update default annotation = _update_annotation_with_default(annotation, name, defaults_dict.get(name, Input._EMPTY)) all_fields[name] = annotation return all_fields def _filter_pipeline_parameters(dct): """Filter out non pipeline parameters and dynamic parameter key.""" return {k: v for k, v in dct.items() if k not in non_pipeline_parameter_names and k != dynamic_param_name} # From annotations get field with type annotations = _filter_pipeline_parameters(getattr(cls_or_func, "__annotations__", {})) annotations = _update_io_from_mldesigner(annotations) annotation_fields = _get_fields(annotations) # Update fields use class field with defaults from class dict or signature(func).paramters if not is_func: # Only consider public fields in class dict defaults_dict = {key: val for key, val in cls_or_func.__dict__.items() if not key.startswith("_")} defaults_dict = _filter_pipeline_parameters(defaults_dict) # Restrict each field must have annotation(in annotation dict) if any(key not in annotation_fields for key in defaults_dict): raise UserErrorException(f"Each field in parameter group {cls_or_func!r} must have an annotation.") else: # Infer parameter type from value if is function defaults_dict = {key: val.default for key, val in signature(cls_or_func).parameters.items()} defaults_dict = _filter_pipeline_parameters(defaults_dict) all_fields = _update_fields_with_default(annotation_fields, defaults_dict) return all_fields def _is_dsl_type_cls(t: type): if type(t) is not type: return False return issubclass(t, (Input, Output)) def _is_dsl_types(o: object): return _is_dsl_type_cls(type(o)) def _remove_empty_values(data, ignore_keys=None): if not isinstance(data, dict): return data ignore_keys = ignore_keys or {} return { k: v if k in ignore_keys else _remove_empty_values(v) for k, v in data.items() if v is not None or k in ignore_keys } def _update_io_from_mldesigner(annotations: dict) -> dict: """This function will translate IOBase from mldesigner package to azure.ml.entities.Input/Output. This function depend on `mldesigner._input_output._IOBase._to_io_entity_args_dict` to translate Input/Output instance annotations to IO entities. This function depend on class names of `mldesigner._input_output` to translate Input/Output class annotations to IO entities. """ mldesigner_pkg = "mldesigner" def _is_input_or_output_type(io: type, type_str: str): """Return true if type name contains type_str""" if isinstance(io, type) and io.__module__.startswith(mldesigner_pkg): if type_str in io.__name__: return True return False result = {} for key, io in annotations.items(): if isinstance(io, type): if _is_input_or_output_type(io, "Input"): # mldesigner.Input -> entities.Input io = Input elif _is_input_or_output_type(io, "Output"): # mldesigner.Output -> entities.Output io = Output elif hasattr(io, "_to_io_entity_args_dict"): try: if _is_input_or_output_type(type(io), "Input"): # mldesigner.Input() -> entities.Input() io = Input(**io._to_io_entity_args_dict()) elif _is_input_or_output_type(type(io), "Output"): # mldesigner.Output() -> entities.Output() io = Output(**io._to_io_entity_args_dict()) except BaseException as e: msg = f"Failed to parse {io} to azure-ai-ml Input/Output." raise ComponentException( message=msg, target=ErrorTarget.COMPONENT, no_personal_data_message=msg, error_category=ErrorCategory.SYSTEM_ERROR, ) from e result[key] = io return result