Source code for azure.ai.ml.entities._job.command_job

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import logging
import copy

from typing import Dict, Union
from azure.ai.ml._schema.job.command_job import CommandJobSchema
from azure.ai.ml.entities._inputs_outputs import Input

from azure.ai.ml.entities._job._input_output_helpers import (
    from_rest_inputs_to_dataset_literal,
    validate_inputs_for_command,
    to_rest_dataset_literal_inputs,
    to_rest_data_outputs,
    from_rest_data_outputs,
)
from azure.ai.ml._restclient.v2022_02_01_preview.models import (
    JobBaseData,
    CommandJob as RestCommandJob,
    ManagedIdentity,
    AmlToken,
    UserIdentity,
)
from azure.ai.ml._utils.utils import (
    map_single_brackets_and_warn,
)
from azure.ai.ml.constants import (
    BASE_PATH_CONTEXT_KEY,
    TYPE,
    JobType,
    ComponentJobConstants,
    LOCAL_COMPUTE_TARGET,
    LOCAL_COMPUTE_PROPERTY,
    ENVIRONMENT_VARIABLES,
)
from azure.ai.ml.entities._job.distribution import DistributionConfiguration
from azure.ai.ml.entities._inputs_outputs import Output
from .job import Job
from azure.ai.ml.entities._util import load_from_dict
from .parameterized_command import ParameterizedCommand
from .resource_configuration import ResourceConfiguration
from .job_io_mixin import JobIOMixin
from .job_limits import CommandJobLimits
from azure.ai.ml._ml_exceptions import ErrorTarget, ErrorCategory, ValidationException

module_logger = logging.getLogger(__name__)


[docs]class CommandJob(Job, ParameterizedCommand, JobIOMixin): """Command job :param name: Name of the job. :type name: str :param description: Description of the job. :type description: str :param tags: Tag dictionary. Tags can be added, removed, and updated. :type tags: dict[str, str] :param display_name: Display name of the job. :type display_name: str :param properties: The asset property dictionary. :type properties: dict[str, str] :param experiment_name: Name of the experiment the job will be created under, if None is provided, default will be set to current directory name. :type experiment_name: str :param services: Information on services associated with the job, readonly. :type services: dict[str, JobService] :param inputs: Inputs to the command. :type inputs: dict[str, Union[azure.ai.ml.Input, str, bool, int, float]] :param outputs: Mapping of output data bindings used in the job. :type outputs: dict[str, azure.ai.ml.Output] :param command: Command to be executed in training. :type command: str :param compute: The compute target the job runs on. :type compute: str :param resources: Compute Resource configuration for the job. :type resources: ~azure.ai.ml.entities.ResourceConfiguration :param code: A local path or http:, https:, azureml: url pointing to a remote location. :type code: str :param distribution: Distribution configuration for distributed training. :type distribution: Union[azure.ai.ml.PyTorchDistribution, azure.ai.ml.MpiDistribution, azure.ai.ml.TensorFlowDistribution] :param environment: Environment that training job will run in. :type environment: Union[azure.ai.ml.entities.Environment, str] :param identity: Identity that training job will use while running on compute. :type identity: Union[azure.ai.ml.ManagedIdentity, azure.ai.ml.AmlToken, azure.ai.ml.UserIdentity] :param limits: Command Job limit. :type limits: ~azure.ai.ml.entities.CommandJobLimits :param kwargs: A dictionary of additional configuration parameters. :type kwargs: dict """ def __init__( self, *, inputs: Dict[str, Union[Input, str, bool, int, float]] = None, outputs: Dict[str, Union[Output]] = None, limits: CommandJobLimits = None, identity: Union[ManagedIdentity, AmlToken, UserIdentity] = None, **kwargs ): kwargs[TYPE] = JobType.COMMAND self._parameters = kwargs.pop("parameters", {}) super().__init__(**kwargs) self.outputs = outputs self.inputs = inputs self.limits = limits self.identity = identity @property def parameters(self) -> Dict[str, str]: """MLFlow parameters :return: MLFlow parameters logged in job. :rtype: Dict[str, str] """ return self._parameters def _to_dict(self) -> Dict: return CommandJobSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self) def _to_rest_object(self) -> JobBaseData: self._validate() self.command = map_single_brackets_and_warn(self.command) modified_properties = copy.deepcopy(self.properties) # Remove any properties set on the service as read-only modified_properties.pop("_azureml.ComputeTargetType", None) # Handle local compute case compute = self.compute resources = self.resources if self.compute == LOCAL_COMPUTE_TARGET: compute = None if resources is None: resources = ResourceConfiguration() if resources.properties is None: resources.properties = {} # This is the format of the October Api response. We need to match it exactly resources.properties[LOCAL_COMPUTE_PROPERTY] = {LOCAL_COMPUTE_PROPERTY: True} properties = RestCommandJob( display_name=self.display_name, description=self.description, command=self.command, code_id=self.code, compute_id=compute, properties=modified_properties, experiment_name=self.experiment_name, inputs=to_rest_dataset_literal_inputs(self.inputs), outputs=to_rest_data_outputs(self.outputs), environment_id=self.environment, distribution=self.distribution, tags=self.tags, identity=self.identity, environment_variables=self.environment_variables, resources=resources._to_rest_object() if resources else None, limits=self.limits._to_rest_object() if self.limits else None, services=self.services, ) result = JobBaseData(properties=properties) result.name = self.name return result @classmethod def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs) -> "CommandJob": loaded_data = load_from_dict(CommandJobSchema, data, context, additional_message, **kwargs) return CommandJob(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) @classmethod def _load_from_rest(cls, obj: JobBaseData) -> "CommandJob": rest_command_job: RestCommandJob = obj.properties command_job = CommandJob( name=obj.name, id=obj.id, display_name=rest_command_job.display_name, description=rest_command_job.description, tags=rest_command_job.tags, properties=rest_command_job.properties, command=rest_command_job.command, experiment_name=rest_command_job.experiment_name, services=rest_command_job.services, status=rest_command_job.status, creation_context=obj.system_data, code=rest_command_job.code_id, compute=rest_command_job.compute_id, environment=rest_command_job.environment_id, distribution=DistributionConfiguration._from_rest_object(rest_command_job.distribution), parameters=rest_command_job.parameters, identity=rest_command_job.identity, environment_variables=rest_command_job.environment_variables, resources=ResourceConfiguration._from_rest_object(rest_command_job.resources), limits=CommandJobLimits._from_rest_object(rest_command_job.limits), inputs=from_rest_inputs_to_dataset_literal(rest_command_job.inputs), outputs=from_rest_data_outputs(rest_command_job.outputs), ) # Handle special case of local job if ( command_job.resources is not None and command_job.resources.properties is not None and command_job.resources.properties.get(LOCAL_COMPUTE_PROPERTY, None) ): command_job.compute = LOCAL_COMPUTE_TARGET command_job.resources.properties.pop(LOCAL_COMPUTE_PROPERTY) return command_job def _to_component(self, context: Dict = None, **kwargs): """Translate a command job to component. :param context: Context of command job YAML file. :param kwargs: Extra arguments. :return: Translated command component. """ from azure.ai.ml.entities import CommandComponent pipeline_job_dict = kwargs.get("pipeline_job_dict", {}) # Create anonymous command component with default version as 1 return CommandComponent( tags=self.tags, base_path=context[BASE_PATH_CONTEXT_KEY], code=self.code, command=self.command, environment=self.environment, description=self.description, inputs=self._to_component_inputs(inputs=self.inputs, pipeline_job_dict=pipeline_job_dict), outputs=self._to_component_outputs(outputs=self.outputs, pipeline_job_dict=pipeline_job_dict), resources=self.resources if self.resources else None, distribution=self.distribution if self.distribution else None, ) def _to_node(self, context: Dict = None, **kwargs): """Translate a command job to a pipeline node. :param context: Context of command job YAML file. :param kwargs: Extra arguments. :return: Translated command component. """ from azure.ai.ml.entities._builders import Command component = self._to_component(context, **kwargs) return Command( component=component, compute=self.compute, # Need to supply the inputs with double curly. inputs=self.inputs, outputs=self.outputs, environment_variables=self.environment_variables, description=self.description, tags=self.tags, display_name=self.display_name, limits=self.limits, ) def _validate(self) -> None: if self.name is None: msg = "Job name is required" raise ValidationException( message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB, error_category=ErrorCategory.USER_ERROR, ) if self.compute is None: msg = "compute is required" raise ValidationException( message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB, error_category=ErrorCategory.USER_ERROR, ) if self.command is None: msg = "command is required" raise ValidationException( message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB, error_category=ErrorCategory.USER_ERROR, ) if self.environment is None: msg = "environment is required for non-local runs" raise ValidationException( message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB, error_category=ErrorCategory.USER_ERROR, ) validate_inputs_for_command(self.command, self.inputs)