Source code for azure.ai.ml.entities._job.parallel.parallel_job

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import logging
from typing import Dict, Union

from azure.ai.ml._schema.job.parallel_job import ParallelJobSchema
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml._restclient.v2022_02_01_preview.models import JobBaseData
from azure.ai.ml.constants import (
    BASE_PATH_CONTEXT_KEY,
    TYPE,
    JobType,
)

from ..job import Job
from azure.ai.ml.entities._util import load_from_dict
from ..job_io_mixin import JobIOMixin
from .parameterized_parallel import ParameterizedParallel
from azure.ai.ml._ml_exceptions import ErrorTarget, ErrorCategory, ValidationException

module_logger = logging.getLogger(__name__)


[docs]class ParallelJob(Job, ParameterizedParallel, JobIOMixin): """Parallel job :param name: Name of the job. :type name: str :param version: Version of the job. :type version: str :param id: Global id of the resource, Azure Resource Manager ID. :type id: str :param type: Type of the job, supported is 'parallel'. :type type: str :param description: Description of the job. :type description: str :param tags: Internal use only. :type tags: dict :param properties: Internal use only. :type properties: dict :param display_name: Display name of the job. :type display_name: str :param retry_settings: parallel job run failed retry :type retry_settings: BatchRetrySettings :param logging_level: A string of the logging level name :type logging_level: str :param max_concurrency_per_instance: The max parallellism that each compute instance has. :type max_concurrency_per_instance: int :param error_threshold: The number of item processing failures should be ignored. :type error_threshold: int :param mini_batch_error_threshold: The number of mini batch processing failures should be ignored. :type mini_batch_error_threshold: int :param task: The parallel task. :type task: ParallelTask :param mini_batch_size: The mini batch size. :type mini_batch_size: str :param input_data: The input data. :type input_data: str :param inputs: Inputs of the job. :type inputs: dict :param outputs: Outputs of the job. :type outputs: dict """ def __init__( self, *, inputs: Dict[str, Union[Input, str, bool, int, float]] = None, outputs: Dict[str, Output] = None, **kwargs, ): kwargs[TYPE] = JobType.PARALLEL super().__init__(**kwargs) self.inputs = inputs self.outputs = outputs def _to_dict(self): return ParallelJobSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self) def _to_rest_object(self): pass @classmethod def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs) -> "ParallelJob": loaded_data = load_from_dict(ParallelJobSchema, data, context, additional_message, **kwargs) return ParallelJob(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data) @classmethod def _load_from_rest(cls, obj: JobBaseData): pass def _to_component(self, context: Dict = None, **kwargs): """Translate a parallel job to component job. :param context: Context of parallel job YAML file. :param kwargs: Extra arguments. :return: Translated parallel component. """ from azure.ai.ml.entities import ParallelComponent pipeline_job_dict = kwargs.get("pipeline_job_dict", {}) # Create anonymous parallel component with default version as 1 return ParallelComponent( base_path=context[BASE_PATH_CONTEXT_KEY], is_anonymous=True, mini_batch_size=self.mini_batch_size, input_data=self.input_data, task=self.task, retry_settings=self.retry_settings, logging_level=self.logging_level, max_concurrency_per_instance=self.max_concurrency_per_instance, error_threshold=self.error_threshold, mini_batch_error_threshold=self.mini_batch_error_threshold, inputs=self._to_component_inputs(inputs=self.inputs, pipeline_job_dict=pipeline_job_dict), outputs=self._to_component_outputs(outputs=self.outputs, pipeline_job_dict=pipeline_job_dict), resources=self.resources if self.resources else None, ) def _to_node(self, context: Dict = None, **kwargs): """Translate a parallel job to a pipeline node. :param context: Context of parallel job YAML file. :param kwargs: Extra arguments. :return: Translated parallel component. """ from azure.ai.ml.entities._builders import Parallel component = self._to_component(context, **kwargs) return Parallel( component=component, compute=self.compute, # Need to supply the inputs with double curly. inputs=self.inputs, outputs=self.outputs, mini_batch_size=self.mini_batch_size, input_data=self.input_data, task=self.task, retry_settings=self.retry_settings, logging_level=self.logging_level, max_concurrency_per_instance=self.max_concurrency_per_instance, error_threshold=self.error_threshold, mini_batch_error_threshold=self.mini_batch_error_threshold, environment_variables=self.environment_variables, ) def _validate(self) -> None: if self.name is None: msg = "Job name is required" raise ValidationException( message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB, error_category=ErrorCategory.USER_ERROR, ) if self.compute is None: msg = "compute is required" raise ValidationException( message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB, error_category=ErrorCategory.USER_ERROR, ) if self.task is None: msg = "task is required" raise ValidationException( message=msg, no_personal_data_message=msg, target=ErrorTarget.JOB, error_category=ErrorCategory.USER_ERROR, )