Source code for azure.ai.ml.entities._builders.command_func

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import os
from typing import Dict, Union, Callable, Tuple

from azure.ai.ml.constants import AssetTypes, LegacyAssetTypes, ComponentSource
from azure.ai.ml.entities._job.sweep.search_space import SweepDistribution
from .command import Command
from azure.ai.ml.entities import (
    Environment,
    CommandComponent,
)
from azure.ai.ml.entities._job.distribution import MpiDistribution, TensorFlowDistribution, PyTorchDistribution
from azure.ai.ml.entities._component.input_output import ComponentInput, ComponentOutput
from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin
from azure.ai.ml._restclient.v2022_02_01_preview.models import ManagedIdentity, AmlToken, UserIdentity
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml._ml_exceptions import ValidationException, ErrorTarget

SUPPORTED_INPUTS = [
    LegacyAssetTypes.PATH,
    AssetTypes.URI_FILE,
    AssetTypes.URI_FOLDER,
    AssetTypes.CUSTOM_MODEL,
    AssetTypes.MLFLOW_MODEL,
    AssetTypes.MLTABLE,
    AssetTypes.TRITON_MODEL,
]


def _parse_input(input_value):
    component_input, job_input = None, None
    if isinstance(input_value, Input):
        component_input = input_value._to_component_input()
        input_type = input_value.type
        if input_type in SUPPORTED_INPUTS:
            job_input = Input(**input_value._to_dict())
    elif isinstance(input_value, ComponentInput):
        # if user provided component input, job input will be None
        component_input = input_value
    elif isinstance(input_value, dict):
        # if user provided dict, we try to parse it to ComponentInput and JobInput separately.
        # only parse to JobInput for path type
        input_type = input_value.get("type", None)
        if input_type in SUPPORTED_INPUTS:
            job_input = Input(**input_value)
        component_input = ComponentInput(input_value)
    elif isinstance(input_value, (SweepDistribution, str, bool, int, float)):
        # Input bindings are not supported
        component_input = ComponentTranslatableMixin._to_component_input_builder_function(input_value)
        job_input = input_value
    else:
        msg = f"Unsupported input type: {type(input_value)}, only Input, ComponentInput, dict, str, bool, int and float are supported."
        raise ValidationException(message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB)
    return component_input, job_input


def _parse_output(output_value):
    component_output, job_output = None, None
    if isinstance(output_value, Output):
        component_output = output_value._to_component_output()
        job_output = Output(**output_value._to_dict())
    elif isinstance(output_value, ComponentOutput):
        component_output = output_value
    elif not output_value:
        # output value can be None or empty dictionary
        # None output value will be packed into a JobOutput object with mode = ReadWriteMount & type = UriFolder
        component_output = ComponentTranslatableMixin._to_component_output(output_value)
        job_output = output_value
    elif isinstance(output_value, dict):  # When output value is a non-empty dictionary
        job_output = Output(**output_value)
        component_output = ComponentOutput(output_value)
    elif isinstance(output_value, str):  # When output is passed in from pipeline job yaml
        job_output = output_value
    else:
        msg = f"Unsupported output type: {type(output_value)}, only Output, ComponentOutput, JobOutput, and dict are supported."
        raise ValidationException(message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB)
    return component_output, job_output


def _parse_inputs_outputs(io_dict: Dict, parse_func: Callable) -> Tuple[Dict, Dict]:
    component_io_dict, job_io_dict = {}, {}
    if io_dict:
        for key, val in io_dict.items():
            component_io, job_io = parse_func(val)
            component_io_dict[key] = component_io
            job_io_dict[key] = job_io
    return component_io_dict, job_io_dict


[docs]def command( *, name: str = None, description: str = None, tags: Dict = None, properties: Dict = None, display_name: str = None, command: str = None, experiment_name: str = None, environment: Union[str, Environment] = None, environment_variables: Dict = None, distribution: Union[Dict, MpiDistribution, TensorFlowDistribution, PyTorchDistribution] = None, compute: str = None, inputs: Dict = None, outputs: Dict = None, instance_count: int = None, instance_type: str = None, timeout: int = None, code: Union[str, os.PathLike] = None, identity: Union[ManagedIdentity, AmlToken, UserIdentity] = None, **kwargs, ) -> Command: """Create a Command object which can be used inside dsl.pipeline as a function and can also be created as a standalone command job. :param name: Name of the command job or component created :type name: str :param description: a friendly description of the command :type description: str :param tags: Tags to be attached to this command :type tags: Dict :param properties: The asset property dictionary. :type properties: dict[str, str] :param display_name: a friendly name :type display_name: str :param experiment_name: Name of the experiment the job will be created under, if None is provided, default will be set to current directory name. Will be ignored as a pipeline step. :type experiment_name: str :param command: the command string that will be run :type command: str :param environment: the environment to use for this command :type environment: Union[str, azure.ai.ml.entities.Environment] :param environment_variables: environment variables to set on the compute before this command is executed :type environment_variables: dict :param distribution: the distribution mode to use for this command :type distribution: Union[Dict, azure.ai.ml.MpiDistribution, azure.ai.ml.TensorFlowDistribution, azure.ai.ml.PyTorchDistribution] :param compute: the name of the compute where the command job is executed( will not be used if the command is used as a component/function) :type compute: str :param inputs: a dict of inputs used by this command. :type inputs: Dict :param outputs: the outputs of this command :type outputs: Dict :param instance_count: Optional number of instances or nodes used by the compute target. Defaults to 1. :vartype instance_count: int :param instance_type: Optional type of VM used as supported by the compute target. :vartype instance_type: str :param timeout: The number in seconds, after which the job will be cancelled. :vartype timeout: int :param code: the code folder to run -- typically a local folder that will be uploaded as the job is submitted :type code: Union[str, os.PathLike] :param identity: Identity that training job will use while running on compute. :type identity: Union[azure.ai.ml.ManagedIdentity, azure.ai.ml.AmlToken] """ inputs = inputs or {} outputs = outputs or {} component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input) # job inputs can not be None job_inputs = {k: v for k, v in job_inputs.items() if v is not None} component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output) component = kwargs.pop("component", None) if component is None: component = CommandComponent( base_path=os.getcwd(), # base path should be current folder name=name, tags=tags, code=code, command=command, environment=environment, display_name=display_name, description=description, inputs=component_inputs, outputs=component_outputs, distribution=distribution, environment_variables=environment_variables, _source=ComponentSource.BUILDER, **kwargs, ) command_obj = Command( component=component, name=name, description=description, tags=tags, properties=properties, display_name=display_name, experiment_name=experiment_name, compute=compute, inputs=job_inputs, outputs=job_outputs, identity=identity, distribution=distribution, environment=environment, environment_variables=environment_variables, **kwargs, ) if instance_count is not None or instance_type is not None: command_obj.set_resources(instance_count=instance_count, instance_type=instance_type) if timeout is not None: command_obj.set_limits(timeout=timeout) return command_obj