Source code for azure.ai.ml.entities._job.parallel.parameterized_parallel

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import logging
from typing import Union, Dict

from azure.ai.ml._schema.job.loadable_mixin import LoadableMixin
from .retry_settings import RetrySettings
from .parallel_task import ParallelTask
from ..resource_configuration import ResourceConfiguration


module_logger = logging.getLogger(__name__)


[docs]class ParameterizedParallel(LoadableMixin): """Parallel component that contains the traning parallel and supporting parameters for the parallel. :param retry_settings: parallel component run failed retry :type retry_settings: BatchRetrySettings :param logging_level: A string of the logging level name :type logging_level: str :param max_concurrency_per_instance: The max parallellism that each compute instance has. :type max_concurrency_per_instance: int :param error_threshold: The number of item processing failures should be ignored. :type error_threshold: int :param mini_batch_error_threshold: The number of mini batch processing failures should be ignored. :type mini_batch_error_threshold: int :param task: The parallel task. :type task: ParallelTask :param mini_batch_size: The mini batch size. :type mini_batch_size: str :param input_data: The input data. :type input_data: str :param resources: Compute Resource configuration for the job. :type resources: Union[Dict, ~azure.ai.ml.entities.ResourceConfiguration] """ def __init__( self, retry_settings: RetrySettings = None, logging_level: str = None, max_concurrency_per_instance: int = None, error_threshold: int = None, mini_batch_error_threshold: int = None, input_data: str = None, task: ParallelTask = None, mini_batch_size: int = None, resources: Union[dict, ResourceConfiguration] = None, environment_variables: Dict = None, ): self.mini_batch_size = mini_batch_size self.task = task self.retry_settings = retry_settings self.input_data = input_data self.logging_level = logging_level self.max_concurrency_per_instance = max_concurrency_per_instance self.error_threshold = error_threshold self.mini_batch_error_threshold = mini_batch_error_threshold self.resources = resources self.environment_variables = dict(environment_variables) if environment_variables else {} @property def task(self) -> ParallelTask: return self._task @task.setter def task(self, value): if isinstance(value, dict): value = ParallelTask(**value) self._task = value @property def resources(self) -> ResourceConfiguration: return self._resources @resources.setter def resources(self, value): if isinstance(value, dict): value = ResourceConfiguration(**value) self._resources = value @property def retry_settings(self) -> RetrySettings: return self._retry_settings @retry_settings.setter def retry_settings(self, value): if isinstance(value, dict): value = RetrySettings(**value) self._retry_settings = value