# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import json
import logging
import os.path
from os import PathLike
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
Optional,
Union,
)
from azure.ai.ml._azure_environments import ENDPOINT_URLS, _get_cloud_details, resource_to_scopes
from azure.ai.ml.entities._assets._artifacts.code import Code
from azure.ai.ml.entities._job.job_name_generator import generate_job_name
from ..entities._validation import ValidationResult, _ValidationResultBuilder
try:
from typing import Protocol # For python >= 3.8
except ImportError:
from typing_extensions import Protocol # For python < 3.8
from azure.ai.ml.constants import (
BATCH_JOB_CHILD_RUN_NAME,
BATCH_JOB_CHILD_RUN_OUTPUT_NAME,
DEFAULT_ARTIFACT_STORE_OUTPUT_NAME,
PipelineConstants,
SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME,
COMMON_RUNTIME_ENV_VAR,
)
from azure.ai.ml.entities._job.job_errors import JobParsingError, PipelineChildJobError
import jwt
from azure.identity import ChainedTokenCredential
from azure.core.exceptions import ResourceNotFoundError
from azure.ai.ml._artifacts._artifact_utilities import (
download_artifact_from_aml_uri,
aml_datastore_path_exists,
)
from azure.ai.ml.operations._run_history_constants import RunHistoryConstants
from azure.ai.ml._restclient.v2022_02_01_preview import (
AzureMachineLearningWorkspaces as ServiceClient022022Preview,
)
from azure.ai.ml._restclient.v2022_02_01_preview.models import (
JobBaseData,
UserIdentity,
JobType as RestJobType,
ListViewType,
)
from azure.ai.ml._restclient.runhistory import (
AzureMachineLearningWorkspaces as ServiceClientRunHistory,
)
from azure.ai.ml._restclient.model_dataplane import (
AzureMachineLearningWorkspaces as ServiceClientModelDataplane,
)
from azure.ai.ml._restclient.dataset_dataplane import (
AzureMachineLearningWorkspaces as ServiceClientDatasetDataplane,
)
from azure.ai.ml._utils.utils import (
create_session_with_retry,
download_text_from_url,
is_url,
is_data_binding_expression,
get_list_view_type,
)
from azure.ai.ml._scope_dependent_operations import OperationsContainer, OperationScope, _ScopeDependentOperations
from azure.ai.ml.constants import (
AzureMLResourceType,
TID_FMT,
AZUREML_RESOURCE_PROVIDER,
LEVEL_ONE_NAMED_RESOURCE_ID_FORMAT,
LOCAL_COMPUTE_TARGET,
SHORT_URI_FORMAT,
AssetTypes,
API_URL_KEY,
)
from azure.ai.ml.entities import (
CommandJob,
Job,
PipelineJob,
Component,
CommandComponent,
Compute,
ParallelJob,
ParallelComponent,
)
from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
from azure.ai.ml.sweep import SweepJob
from azure.ai.ml.entities._job.base_job import _BaseJob
from azure.ai.ml.entities._job.job import _is_pipeline_child_job
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml.entities._builders import Command, BaseNode, Sweep, Parallel
from azure.ai.ml.entities._job.pipeline.pipeline_job_settings import PipelineJobSettings
from azure.ai.ml._artifacts._artifact_utilities import _upload_and_generate_remote_uri
from ._job_ops_helper import (
get_git_properties,
stream_logs_until_completion,
get_job_output_uris_from_dataplane,
)
from ._local_job_invoker import is_local_run, start_run_if_local
from ._operation_orchestrator import OperationOrchestrator, is_ARM_id_for_resource, is_registry_id_for_resource
from ._run_operations import RunOperations
from ._dataset_dataplane_operations import DatasetDataplaneOperations
from ._model_dataplane_operations import ModelDataplaneOperations
from ._compute_operations import ComputeOperations
if TYPE_CHECKING:
from azure.ai.ml.operations import DatastoreOperations
from azure.ai.ml._telemetry import (
AML_INTERNAL_LOGGER_NAMESPACE,
ActivityType,
monitor_with_activity,
monitor_with_telemetry_mixin,
)
from azure.ai.ml._ml_exceptions import JobException, ErrorCategory, ErrorTarget, ValidationException
logger = logging.getLogger(AML_INTERNAL_LOGGER_NAMESPACE + __name__)
logger.propagate = False
module_logger = logging.getLogger(__name__)
[docs]class JobOperations(_ScopeDependentOperations):
"""
JobOperations
You should not instantiate this class directly. Instead, you should create an MLClient instance that instantiates it for you and attaches it as an attribute.
"""
def __init__(
self,
operation_scope: OperationScope,
service_client_02_2022_preview: ServiceClient022022Preview,
all_operations: OperationsContainer,
credential: ChainedTokenCredential,
**kwargs: Any,
):
super(JobOperations, self).__init__(operation_scope)
if "app_insights_handler" in kwargs:
logger.addHandler(kwargs.pop("app_insights_handler"))
self._operation_2022_02_preview = service_client_02_2022_preview.jobs
self._all_operations = all_operations
self._kwargs = kwargs
self._stream_logs_until_completion = stream_logs_until_completion
# Dataplane service clients are lazily created as they are needed
self._runs_operations_client = None
self._dataset_dataplane_operations_client = None
self._model_dataplane_operations_client = None
self._api_base_url = None
self._container = "azureml"
self._credential = credential
self._orchestrators = OperationOrchestrator(self._all_operations, self._operation_scope)
@property
def _compute_operations(self) -> ComputeOperations:
return self._all_operations.get_operation(
AzureMLResourceType.COMPUTE, lambda x: isinstance(x, ComputeOperations)
)
@property
def _datastore_operations(self) -> "DatastoreOperations":
return self._all_operations.all_operations[AzureMLResourceType.DATASTORE]
@property
def _runs_operations(self) -> RunOperations:
if not self._runs_operations_client:
service_client_run_history = ServiceClientRunHistory(self._credential, base_url=self._api_url)
self._runs_operations_client = RunOperations(self._operation_scope, service_client_run_history)
return self._runs_operations_client
@property
def _dataset_dataplane_operations(self) -> DatasetDataplaneOperations:
if not self._dataset_dataplane_operations_client:
service_client_dataset_dataplane = ServiceClientDatasetDataplane(self._credential, base_url=self._api_url)
self._dataset_dataplane_operations_client = DatasetDataplaneOperations(
self._operation_scope, service_client_dataset_dataplane
)
return self._dataset_dataplane_operations_client
@property
def _model_dataplane_operations(self) -> ModelDataplaneOperations:
if not self._model_dataplane_operations_client:
service_client_model_dataplane = ServiceClientModelDataplane(self._credential, base_url=self._api_url)
self._model_dataplane_operations_client = ModelDataplaneOperations(
self._operation_scope, service_client_model_dataplane
)
return self._model_dataplane_operations_client
@property
def _api_url(self):
if not self._api_base_url:
self._api_base_url = self._get_workspace_url(url_key=API_URL_KEY)
return self._api_base_url
[docs] @monitor_with_activity(logger, "Job.List", ActivityType.PUBLICAPI)
def list(
self,
parent_job_name: str = None,
*,
list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
schedule_defined: bool = None,
scheduled_job_name: str = None,
) -> Iterable[Job]:
"""List jobs of the workspace.
:param parent_job_name: When provided, returns children of named job.
:type parent_job_name: Optional[str]
:param list_view_type: View type for including/excluding (for example) archived jobs. Default: ACTIVE_ONLY.
:type list_view_type: Optional[ListViewType]
:param schedule_defined: When provided, only jobs that initially defined a schedule will be returned.
:type schedule_defined: Optional[bool]
:param scheduled_job_name: Name of a job that initially defined a schedule. When provided, only jobs triggered by the schedule of the given job will be returned.
:type scheduled_job_name: Optional[str]
:return: An iterator like instance of Job objects.
:rtype: ~azure.core.paging.ItemPaged[Job]
"""
if parent_job_name:
parent_job = self.get(parent_job_name)
return self._runs_operations.get_run_children(parent_job.name)
return self._operation_2022_02_preview.list(
self._operation_scope.resource_group_name,
self._workspace_name,
cls=lambda objs: [self._handle_rest_errors(obj) for obj in objs],
list_view_type=list_view_type,
scheduled=schedule_defined,
schedule_id=scheduled_job_name,
**self._kwargs,
)
def _handle_rest_errors(self, job_object):
"""Handle errors while resolving azureml_id's during list operation"""
try:
return self._resolve_azureml_id(Job._from_rest_object(job_object))
except JobParsingError:
pass
[docs] @monitor_with_telemetry_mixin(logger, "Job.Get", ActivityType.PUBLICAPI)
def get(self, name: str) -> Job:
"""Get a job resource.
:param str name: Name of the job.
:return: Job object retrieved from the service.
:rtype: Job
:raise: ResourceNotFoundError if can't find a job matching provided name.
"""
job_object = self._get_job(name)
if not _is_pipeline_child_job(job_object):
job = Job._from_rest_object(job_object)
if job_object.properties.job_type != RestJobType.AUTO_ML:
# resolvers do not work with the old contract, leave the ids as is
job = self._resolve_azureml_id(job)
else:
# Child jobs are no longer available through MFE, fetch
# through run history instead
job = self._runs_operations._translate_from_rest_object(self._runs_operations.get_run(name))
return job
[docs] @monitor_with_activity(logger, "Job.Cancel", ActivityType.PUBLICAPI)
def cancel(self, name: str) -> None:
"""Cancel job resource.
:param str name: Name of the job.
:return: None, or the result of cls(response)
:rtype: None
:raise: ResourceNotFoundError if can't find a job matching provided name.
"""
return self._operation_2022_02_preview.cancel(
id=name,
resource_group_name=self._operation_scope.resource_group_name,
workspace_name=self._workspace_name,
**self._kwargs,
)
[docs] def try_get_compute_arm_id(self, compute: Union[Compute, str]):
if compute is not None:
if is_ARM_id_for_resource(compute, resource_type=AzureMLResourceType.COMPUTE):
# compute is not a sub-workspace resource
compute_name = compute.split("/")[-1]
elif isinstance(compute, Compute):
compute_name = compute.name
elif isinstance(compute, str):
compute_name = compute
else:
raise ValueError(
"compute must be either an arm id of Compute, a Compute object or a compute name but got {}".format(
type(compute)
)
)
if is_data_binding_expression(compute_name):
return compute_name
else:
try:
return self._compute_operations.get(compute_name).id
except ResourceNotFoundError:
# the original error is not helpful (Operation returned an invalid status 'Not Found'),
# so we raise a more helpful one
raise ResourceNotFoundError("Not found compute with name {}".format(compute_name))
return None
@monitor_with_telemetry_mixin(logger, "Job.Validate", ActivityType.INTERNALCALL)
def _validate(self, job: Job, raise_on_failure: bool = False) -> ValidationResult:
"""Validate a pipeline job.
if there are inline defined entities, e.g. Component, Environment & Code, they won't be created.
:param job: Job object to be validated.
:type job: Job
:return: a ValidationResult object containing all found errors.
:rtype: ValidationResult
"""
# validation is open for PipelineJob only for now
if not isinstance(job, PipelineJob):
return _ValidationResultBuilder.success()
job._validate(raise_error=True)
try:
job.compute = self.try_get_compute_arm_id(job.compute)
for node in job.jobs.values():
node.compute = self.try_get_compute_arm_id(node.compute)
return _ValidationResultBuilder.success()
except Exception as e:
if raise_on_failure:
raise
else:
logger.warning(f"Validation failed: {e}")
return _ValidationResultBuilder.from_single_message(singular_error_message=str(e), yaml_path="compute")
[docs] @monitor_with_telemetry_mixin(logger, "Job.CreateOrUpdate", ActivityType.PUBLICAPI)
def create_or_update(
self,
job: Union[Job, BaseNode],
*,
description: str = None,
compute: str = None,
tags: dict = None,
experiment_name: str = None,
**kwargs,
) -> Job:
"""Create or update a job, if there're inline defined entities, e.g. Environment, Code, they'll be created together with the job.
:param Union[Job,BaseNode] job: Job definition or object which can be translate to a job.
:param description: Description to overwrite when submitting the pipeline.
:type description: str
:param compute: Compute target to overwrite when submitting the pipeline.
:type compute: str
:param tags: Tags to overwrite when submitting the pipeline.
:type tags: dict
:param experiment_name: Name of the experiment the job will be created under, if None is provided, job will be created under experiment 'Default'.
:type experiment_name: str
:return: Created or updated job.
:rtype: Job
"""
if isinstance(job, BaseNode):
job = job._to_job()
self._generate_job_defaults(job)
# Set job properties before submission
if description is not None:
job.description = description
if compute is not None:
job.compute = compute
if tags is not None:
job.tags = tags
if experiment_name is not None:
job.experiment_name = experiment_name
if job.compute == LOCAL_COMPUTE_TARGET:
job.environment_variables[COMMON_RUNTIME_ENV_VAR] = "true"
self._validate(job, raise_on_failure=True)
# Create all dependent resources
self._resolve_arm_id_or_upload_dependencies(job)
git_props = get_git_properties()
# Do not add git props if they already exist in job properties.
# This is for update specifically-- if the user switches branches and tries to update their job, the request will fail since the git props will be repopulated.
# MFE does not allow existing properties to be updated, only for new props to be added
if not any(prop_name in job.properties for prop_name in git_props.keys()):
job.properties = {**job.properties, **git_props}
rest_job_resource = job._to_rest_object()
# Make a copy of self._kwargs instead of contaminate the original one
kwargs = dict(**self._kwargs)
if hasattr(rest_job_resource.properties, "identity") and (
rest_job_resource.properties.identity is None
or isinstance(rest_job_resource.properties.identity, UserIdentity)
):
self._set_headers_with_user_aml_token(kwargs)
result = self._operation_2022_02_preview.create_or_update(
id=rest_job_resource.name, # type: ignore
resource_group_name=self._operation_scope.resource_group_name,
workspace_name=self._workspace_name,
body=rest_job_resource,
**kwargs,
)
if is_local_run(result):
ws_base_url = self._all_operations.all_operations[
AzureMLResourceType.WORKSPACE
]._operation._client._base_url
snapshot_id = start_run_if_local(result, self._credential, ws_base_url)
# in case of local run, the first create/update call to MFE returns the
# request for submitting to ES. Once we request to ES and start the run, we
# need to put the same body to MFE to append user tags etc.
job_object = self._get_job(rest_job_resource.name)
if result.properties.tags is not None:
for tag_name, tag_value in rest_job_resource.properties.tags.items():
job_object.properties.tags[tag_name] = tag_value
if result.properties.properties is not None:
for (
prop_name,
prop_value,
) in rest_job_resource.properties.properties.items():
job_object.properties.properties[prop_name] = prop_value
if snapshot_id is not None:
job_object.properties.properties["ContentSnapshotId"] = snapshot_id
result = self._operation_2022_02_preview.create_or_update(
id=rest_job_resource.name, # type: ignore
resource_group_name=self._operation_scope.resource_group_name,
workspace_name=self._workspace_name,
body=job_object,
**kwargs,
)
return self._resolve_azureml_id(Job._from_rest_object(result))
def _archive_or_restore(self, name: str, is_archived: bool):
job_object = self._get_job(name)
if _is_pipeline_child_job(job_object):
raise PipelineChildJobError(job_id=job_object.id)
job_object.properties.is_archived = is_archived
self._operation_2022_02_preview.create_or_update(
id=job_object.name,
resource_group_name=self._operation_scope.resource_group_name,
workspace_name=self._workspace_name,
body=job_object,
)
[docs] @monitor_with_telemetry_mixin(logger, "Job.Archive", ActivityType.PUBLICAPI)
def archive(self, name: str) -> None:
"""Archive a job or restore an archived job.
:param name: Name of the job.
:type name: str
:raise: ResourceNotFoundError if can't find a job matching provided name.
"""
self._archive_or_restore(name=name, is_archived=True)
[docs] @monitor_with_telemetry_mixin(logger, "Job.Restore", ActivityType.PUBLICAPI)
def restore(self, name: str) -> None:
"""Archive a job or restore an archived job.
:param name: Name of the job.
:type name: str
:raise: ResourceNotFoundError if can't find a job matching provided name.
"""
self._archive_or_restore(name=name, is_archived=False)
[docs] @monitor_with_activity(logger, "Job.Stream", ActivityType.PUBLICAPI)
def stream(self, name: str) -> None:
"""Stream logs of a job.
:param str name: Name of the job.
:raise: ResourceNotFoundError if can't find a job matching provided name.
"""
job_object = self._get_job(name)
if _is_pipeline_child_job(job_object):
raise PipelineChildJobError(job_id=job_object.id)
try:
self._stream_logs_until_completion(
self._runs_operations,
job_object,
self._datastore_operations,
)
except Exception:
raise
[docs] @monitor_with_activity(logger, "Job.Download", ActivityType.PUBLICAPI)
def download(
self,
name: str,
*,
download_path: Union[PathLike, str] = Path.cwd(),
output_name: str = None,
all: bool = False,
) -> None:
"""Download logs and output of a job.
:param str name: Name of a job.
:param Union[PathLike, str] download_path: Local path as download destination, defaults to current working directory.
:param str output_name: Named output to download, defaults to None.
:param bool all: Whether to download logs and all named outputs, defaults to False.
"""
job_details = self.get(name)
# job is reused, get reused job to download
if job_details.properties.get(PipelineConstants.REUSED_FLAG_FIELD) == PipelineConstants.REUSED_FLAG_TRUE:
reused_job_name = job_details.properties[PipelineConstants.REUSED_JOB_ID]
reused_job_detail = self.get(reused_job_name)
module_logger.info(f"job {name} reuses previous job {reused_job_name}, download from the reused job.")
name, job_details = reused_job_name, reused_job_detail
job_status = job_details.status
if job_status not in RunHistoryConstants.TERMINAL_STATUSES:
msg = "This job is in state {}. Download is allowed only in states {}".format(
job_status, RunHistoryConstants.TERMINAL_STATUSES
)
raise JobException(
message=msg,
target=ErrorTarget.JOB,
no_personal_data_message=msg,
error_category=ErrorCategory.USER_ERROR,
)
is_batch_job = job_details.tags.get("azureml.batchrun", None) == "true"
outputs = {}
download_path = Path(download_path)
artifact_directory_name = "artifacts"
output_directory_name = "named-outputs"
def log_missing_uri(what: str):
logger.debug(f'Could not download {what} for job "{job_details.name}" (job status: {job_details.status})')
if isinstance(job_details, SweepJob):
best_child_run_id = job_details.properties.get(SWEEP_JOB_BEST_CHILD_RUN_ID_PROPERTY_NAME, None)
if best_child_run_id:
self.download(best_child_run_id, download_path=download_path, output_name=output_name, all=all)
else:
log_missing_uri(what="from best child run")
if output_name:
# Don't need to download anything from the parent
return
# only download default artifacts (logs + default outputs) from parent
artifact_directory_name = "hd-artifacts"
output_name = None
all = False
if is_batch_job:
scoring_uri = self._get_batch_job_scoring_output_uri(job_details.name)
if scoring_uri:
outputs = {BATCH_JOB_CHILD_RUN_OUTPUT_NAME: scoring_uri}
else:
log_missing_uri("batch job scoring file")
elif output_name:
outputs = self._get_named_output_uri(name, output_name)
if output_name not in outputs:
log_missing_uri(what=f'output "{output_name}"')
elif all:
outputs = self._get_named_output_uri(name)
if DEFAULT_ARTIFACT_STORE_OUTPUT_NAME not in outputs:
log_missing_uri(what="logs")
else:
outputs = self._get_named_output_uri(name, DEFAULT_ARTIFACT_STORE_OUTPUT_NAME)
if DEFAULT_ARTIFACT_STORE_OUTPUT_NAME not in outputs:
log_missing_uri(what="logs")
# Download all requested artifacts
for name, uri in outputs.items():
if is_batch_job:
destination = download_path
elif name == DEFAULT_ARTIFACT_STORE_OUTPUT_NAME:
destination = download_path / artifact_directory_name
else:
destination = download_path / output_directory_name / name
module_logger.info(f"Downloading artifact {uri} to {destination}")
download_artifact_from_aml_uri(
uri=uri, destination=destination, datastore_operation=self._datastore_operations
)
def _get_named_output_uri(
self, job_name: str, output_names: Optional[Union[Iterable[str], str]] = None
) -> Dict[str, str]:
"""Gets the URIs to the specified named outputs of job
:param str job_name: Run ID of the job
:param Optional[Union[Iterable[str], str]] output_names: Either an
output name, or an iterable of output names. If omitted, all
outputs are returned.
:return Dict[str, str]: Map of output_names to URIs. Note that
URIs that could not be found will not be present in the map.
"""
if isinstance(output_names, str):
output_names = {output_names}
elif output_names:
output_names = set(output_names)
outputs = get_job_output_uris_from_dataplane(
job_name,
self._runs_operations,
self._dataset_dataplane_operations,
self._model_dataplane_operations,
output_names=output_names,
)
missing_outputs = (output_names or set()).difference(outputs.keys())
# Include default artifact store in outputs
if (not output_names) or DEFAULT_ARTIFACT_STORE_OUTPUT_NAME in missing_outputs:
try:
job = self.get(job_name)
artifact_store_uri = job.outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME]
if artifact_store_uri and artifact_store_uri.path:
outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME] = artifact_store_uri.path
except (AttributeError, KeyError):
outputs[DEFAULT_ARTIFACT_STORE_OUTPUT_NAME] = SHORT_URI_FORMAT.format(
"workspaceartifactstore", f"ExperimentRun/dcid.{job_name}/"
)
missing_outputs.discard(DEFAULT_ARTIFACT_STORE_OUTPUT_NAME)
# A job's output is not always reported in the outputs dict, but
# doesn't currently have a user configurable location.
# Perform a search of known paths to find output
# TODO: Remove once job output locations are reliably returned from the service
default_datastore = self._datastore_operations.get_default().name
for name in missing_outputs:
potential_uris = [
SHORT_URI_FORMAT.format(default_datastore, f"azureml/{job_name}/{name}/"),
SHORT_URI_FORMAT.format(default_datastore, f"dataset/{job_name}/{name}/"),
]
for potential_uri in potential_uris:
if aml_datastore_path_exists(potential_uri, self._datastore_operations):
outputs[name] = potential_uri
break
return outputs
def _get_batch_job_scoring_output_uri(self, job_name: str) -> Optional[str]:
uri = None
# Download scoring output, which is the "score" output of the child job named "batchscoring"
for child in self._runs_operations.get_run_children(job_name):
if child.properties.get("azureml.moduleName", None) == BATCH_JOB_CHILD_RUN_NAME:
uri = self._get_named_output_uri(child.name, BATCH_JOB_CHILD_RUN_OUTPUT_NAME).get(
BATCH_JOB_CHILD_RUN_OUTPUT_NAME, None
)
# After the correct child is found, break to prevent unnecessary looping
break
return uri
def _get_job(self, name: str) -> JobBaseData:
return self._operation_2022_02_preview.get(
id=name,
resource_group_name=self._operation_scope.resource_group_name,
workspace_name=self._workspace_name,
**self._kwargs,
)
def _get_workspace_url(self, url_key="history"):
discovery_url = (
self._all_operations.all_operations[AzureMLResourceType.WORKSPACE]
.get(self._operation_scope.workspace_name)
.discovery_url
)
all_urls = json.loads(download_text_from_url(discovery_url, create_session_with_retry()))
return all_urls[url_key]
def _generate_job_defaults(self, job: Job) -> None:
# Default name to a generated user friendly name.
if not job.name:
job.name = generate_job_name()
# Default experiment to base path
if not job.experiment_name:
job.experiment_name = Path("./").resolve().stem.replace(" ", "") or "Default"
job.display_name = job.display_name or job.name
def _resolve_arm_id_or_upload_dependencies(self, job: Job) -> None:
"""This method converts name or name:version to ARM id. Or it registers/uploads nested dependencies.
:param job: the job resource entity
:type job: Job
:return: the job resource entity that nested dependencies are resolved
:rtype: Job
"""
self._resolve_arm_id_or_azureml_id(job, self._orchestrators.get_asset_arm_id)
if isinstance(job, PipelineJob):
# Resolve top-level inputs
self._resolve_job_inputs(map(lambda x: x._data, job.inputs.values()), job._base_path)
if job.jobs:
for _, job_instance in job.jobs.items():
# resolve inputs for each job's component
if isinstance(job_instance, BaseNode):
node: BaseNode = job_instance
self._resolve_job_inputs(
map(lambda x: x._data, node.inputs.values()),
job._base_path,
)
elif isinstance(job_instance, AutoMLJob):
self._resolve_automl_job_inputs(job_instance, job._base_path, inside_pipeline=True)
elif isinstance(job, AutoMLJob):
self._resolve_automl_job_inputs(job, job._base_path, inside_pipeline=False)
else:
try:
self._resolve_job_inputs(job.inputs.values(), job._base_path)
except AttributeError:
# If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs
pass
def _resolve_automl_job_input(self, input: str, base_path, inside_pipeline) -> str:
"""Resolves automl job's input.
:param input: Job input
:param base_path: Base path of the yaml file
:param inside_pipeline: If the automl job is inside pipeline.
"""
# If the automl job is inside pipeline job and it's a binding, by pass it to backend.
if inside_pipeline and is_data_binding_expression(str(input)):
return input
try:
if os.path.isabs(input): # absolute local path, upload, transform to remote url
# absolute local path
return _upload_and_generate_remote_uri(self._operation_scope, self._datastore_operations, input)
elif ":" in input or "@" in input: # Check for AzureML id, is there a better way?
asset_type = AzureMLResourceType.DATA
return self._orchestrators.get_asset_arm_id(input, asset_type)
else: # relative local path, upload, transform to remote url
local_path = Path(base_path, input).resolve()
return _upload_and_generate_remote_uri(self._operation_scope, self._datastore_operations, local_path)
except Exception as e:
raise Exception(f"Supported input path value are ARM id, AzureML id, remote uri or local path. {e}")
def _resolve_automl_job_inputs(self, job: AutoMLJob, base_path: os.PathLike, inside_pipeline) -> None:
"""This method resolves the inputs for AutoML jobs.
:param job: the job resource entity
:type job: AutoMLJob
"""
if isinstance(job, AutoMLJob):
data = job._data
self._resolve_job_input(data.training_data.data, job._base_path)
validation_data = data.validation_data
if validation_data and validation_data.data:
self._resolve_job_input(data.validation_data.data, job._base_path)
test_data = data.test_data
if test_data and test_data.data:
self._resolve_job_input(data.test_data.data, job._base_path)
def _resolve_azureml_id(self, job: Job) -> Job:
"""This method converts ARM id to name or name:version for nested entities.
:param job: the job resource entity
:type job: Job
:return: the job resource entity that nested dependencies are resolved
:rtype: Job
"""
self._append_tid_to_studio_url(job)
self._resolve_job_inputs_arm_id(job)
return self._resolve_arm_id_or_azureml_id(job, self._orchestrators.resolve_azureml_id)
def _resolve_compute_id(self, resolver: Callable, target: Any) -> Any:
# special case for local runs
if target is not None and target.lower() == LOCAL_COMPUTE_TARGET:
return LOCAL_COMPUTE_TARGET
try:
modified_target_name = target
if target.lower().startswith(AzureMLResourceType.VIRTUALCLUSTER + "/"):
# Compute target can be either workspace-scoped compute type,
# or AML scoped VC. In the case of VC, resource name will be of form
# azureml:virtualClusters/<name> to disambiguate from azureml:name (which is always compute)
modified_target_name = modified_target_name[len(AzureMLResourceType.VIRTUALCLUSTER) + 1 :]
modified_target_name = LEVEL_ONE_NAMED_RESOURCE_ID_FORMAT.format(
self._operation_scope.subscription_id,
self._operation_scope.resource_group_name,
AZUREML_RESOURCE_PROVIDER,
AzureMLResourceType.VIRTUALCLUSTER,
modified_target_name,
)
return resolver(
modified_target_name,
azureml_type=AzureMLResourceType.VIRTUALCLUSTER,
sub_workspace_resource=False,
)
except Exception:
return resolver(target, azureml_type=AzureMLResourceType.COMPUTE)
def _resolve_job_inputs(self, entries: Iterable[Union[Input, str, bool, int, float]], base_path: str):
"""resolve job inputs as ARM id or remote url"""
for entry in entries:
self._resolve_job_input(entry, base_path)
def _resolve_job_input(self, entry: Union[Input, str, bool, int, float], base_path: str) -> None:
"""resolve job input as ARM id or remote url"""
# path can be empty if the job was created from builder functions
if isinstance(entry, Input) and not entry.path:
msg = "Input path can't be empty for jobs."
raise JobException(message=msg, target=ErrorTarget.JOB, no_personal_data_message=msg)
if (
not isinstance(entry, Input)
or is_ARM_id_for_resource(entry.path)
or is_url(entry.path)
or is_data_binding_expression(entry.path) # literal value but set mode in pipeline yaml
): # Literal value, ARM id or remote url. Pass through
return
try:
if os.path.isabs(entry.path): # absolute local path, upload, transform to remote url
if entry.type == AssetTypes.URI_FOLDER and not os.path.isdir(entry.path):
raise JobException(
message="There is no dir on target path: {}".format(entry.path),
target=ErrorTarget.JOB,
no_personal_data_message="There is no dir on target path",
)
elif entry.type == AssetTypes.URI_FILE and not os.path.isfile(entry.path):
raise JobException(
message="There is no file on target path: {}".format(entry.path),
target=ErrorTarget.JOB,
no_personal_data_message="There is no file on target path",
)
# absolute local path
entry.path = _upload_and_generate_remote_uri(
self._operation_scope,
self._datastore_operations,
entry.path,
)
# TODO : Move this part to a common place
if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"):
entry.path = entry.path + "/"
elif ":" in entry.path or "@" in entry.path: # Check for AzureML id, is there a better way?
asset_type = AzureMLResourceType.DATA
if entry.type in [AssetTypes.MLFLOW_MODEL, AssetTypes.CUSTOM_MODEL]:
asset_type = AzureMLResourceType.MODEL
entry.path = self._orchestrators.get_asset_arm_id(entry.path, asset_type)
else: # relative local path, upload, transform to remote url
local_path = Path(base_path, entry.path).resolve()
entry.path = _upload_and_generate_remote_uri(
self._operation_scope,
self._datastore_operations,
local_path,
)
# TODO : Move this part to a common place
if entry.type == AssetTypes.URI_FOLDER and entry.path and not entry.path.endswith("/"):
entry.path = entry.path + "/"
except Exception as e:
raise JobException(
message=f"Supported input path value are ARM id, AzureML id, remote uri or local path.\n"
f"Met {type(e)}:\n{e}",
target=ErrorTarget.JOB,
no_personal_data_message="Supported input path value are ARM id, AzureML id, remote uri or local path.",
error=e,
)
def _resolve_job_inputs_arm_id(self, job: Job) -> None:
try:
inputs: Dict[str, Union[Input, str, bool, int, float]] = job.inputs
for _, entry in inputs.items():
if not isinstance(entry, Input) or is_url(entry.path): # Literal value or remote url
continue
else: # ARM id
entry.path = self._orchestrators.resolve_azureml_id(entry.path)
except AttributeError:
# If the job object doesn't have "inputs" attribute, we don't need to resolve. E.g. AutoML jobs
pass
def _resolve_arm_id_or_azureml_id(self, job: Job, resolver: Callable) -> Job:
"""Resolve arm_id for a given job"""
# TODO: this will need to be parallelized when multiple tasks
# are required. Also consider the implications for dependencies.
if isinstance(job, _BaseJob):
job.compute = self._resolve_compute_id(resolver, job.compute)
elif isinstance(job, CommandJob):
job = self._resolve_arm_id_for_command_job(job, resolver)
elif isinstance(job, ParallelJob):
job = self._resolve_arm_id_for_parallel_job(job, resolver)
elif isinstance(job, SweepJob):
job = self._resolve_arm_id_for_sweep_job(job, resolver)
elif isinstance(job, AutoMLJob):
job = self._resolve_arm_id_for_automl_job(job, resolver, inside_pipeline=False)
elif isinstance(job, PipelineJob):
job = self._resolve_arm_id_for_pipeline_job(job, resolver)
else:
msg = f"Non supported job type: {type(job)}"
raise JobException(
message=msg,
target=ErrorTarget.JOB,
no_personal_data_message=msg,
error_category=ErrorCategory.USER_ERROR,
)
return job
def _resolve_arm_id_for_command_job(self, job: Job, resolver: Callable) -> Job:
"""Resolve arm_id for CommandJob"""
if job.code is not None and is_registry_id_for_resource(job.code):
msg = f"Format not supported for code asset: {job.code}"
raise JobException(
message=msg,
target=ErrorTarget.JOB,
no_personal_data_message=msg,
error_category=ErrorCategory.USER_ERROR,
)
if job.code is not None and not is_ARM_id_for_resource(job.code, AzureMLResourceType.CODE):
job.code = resolver(
Code(base_path=job._base_path, path=job.code),
azureml_type=AzureMLResourceType.CODE,
)
job.environment = resolver(job.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)
job.compute = self._resolve_compute_id(resolver, job.compute)
return job
def _resolve_arm_id_for_parallel_job(self, job: Job, resolver: Callable) -> Job:
"""Resolve arm_id for ParallelJob"""
if job.code is not None and not is_ARM_id_for_resource(job.code, AzureMLResourceType.CODE):
job.code = resolver(
Code(base_path=job._base_path, path=job.code),
azureml_type=AzureMLResourceType.CODE,
)
job.environment = resolver(job.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)
job.compute = self._resolve_compute_id(resolver, job.compute)
return job
def _resolve_arm_id_for_sweep_job(self, job: Job, resolver: Callable) -> Job:
"""Resolve arm_id for SweepJob"""
if job.trial.code is not None and not is_ARM_id_for_resource(job.trial.code, AzureMLResourceType.CODE):
job.trial.code = resolver(
Code(base_path=job._base_path, path=job.trial.code),
azureml_type=AzureMLResourceType.CODE,
)
job.trial.environment = resolver(job.trial.environment, azureml_type=AzureMLResourceType.ENVIRONMENT)
job.compute = self._resolve_compute_id(resolver, job.compute)
return job
def _resolve_arm_id_for_automl_job(self, job: Job, resolver: Callable, inside_pipeline: bool) -> Job:
"""Resolve arm_id for AutoMLJob"""
# AutoML does not have dependency uploads. Only need to resolve reference to arm id.
# automl node in pipeline has optional compute
if inside_pipeline and job.compute is None:
return job
job.compute = resolver(job.compute, azureml_type=AzureMLResourceType.COMPUTE)
return job
def _resolve_arm_id_for_pipeline_job(self, pipeline_job: "PipelineJob", resolver: Callable) -> Job:
"""Resolve arm_id for pipeline_job"""
# Get top-level job compute
self._get_job_compute_id(pipeline_job, resolver)
# Process job defaults:
if pipeline_job.settings:
pipeline_job.settings.default_datastore = resolver(
pipeline_job.settings.default_datastore,
azureml_type=AzureMLResourceType.DATASTORE,
)
pipeline_job.settings.default_compute = resolver(
pipeline_job.settings.default_compute,
azureml_type=AzureMLResourceType.COMPUTE,
)
# Process each component job
if pipeline_job.jobs:
for key, job_instance in pipeline_job.jobs.items():
if isinstance(job_instance, AutoMLJob):
self._resolve_arm_id_for_automl_job(job_instance, resolver, inside_pipeline=True)
elif isinstance(job_instance, (Command, Sweep, Parallel)):
# Get the default for the specific job type
if (
isinstance(job_instance.component, (CommandComponent, ParallelComponent))
and job_instance.component._is_anonymous
and not job_instance.component.display_name
):
job_instance.component.display_name = key
# Get compute for each job
self._get_job_compute_id(job_instance, resolver)
# set default code & environment for component
self._set_defaults_to_component(job_instance.component, pipeline_job.settings)
# Get the component id for each job's component
job_instance._component = resolver(
job_instance.trial if isinstance(job_instance, Sweep) else job_instance.component,
azureml_type=AzureMLResourceType.COMPONENT,
)
else:
msg = f"Non supported job type in Pipeline jobs: {type(job_instance)}"
raise JobException(
message=msg,
target=ErrorTarget.JOB,
no_personal_data_message=msg,
error_category=ErrorCategory.USER_ERROR,
)
return pipeline_job
def _get_job_compute_id(self, job: Union[Job, Command], resolver: Callable) -> None:
job.compute = resolver(job.compute, azureml_type=AzureMLResourceType.COMPUTE)
def _append_tid_to_studio_url(self, job: Job) -> None:
"""Appends the user's tenant ID to the end of the studio URL so the UI knows against which tenant to authenticate"""
try:
studio_endpoint = job.services.get("Studio", None)
studio_url = studio_endpoint.endpoint
cloud_details = _get_cloud_details()
cloud_details = _get_cloud_details()
default_scopes = resource_to_scopes(cloud_details.get(ENDPOINT_URLS.RESOURCE_MANAGER_ENDPOINT))
module_logger.debug(f"default_scopes used: `{default_scopes}`\n")
# Extract the tenant id from the credential using PyJWT
decode = jwt.decode(
self._credential.get_token(*default_scopes).token,
options={"verify_signature": False, "verify_aud": False},
)
tid = decode["tid"]
formatted_tid = TID_FMT.format(tid)
studio_endpoint.endpoint = studio_url + formatted_tid
except Exception:
module_logger.info("Proceeding with no tenant id appended to studio URL\n")
def _set_defaults_to_component(self, component: Union[str, Component], settings: PipelineJobSettings):
"""Set default code&environment to component if not specified."""
if isinstance(component, (CommandComponent, ParallelComponent)):
# TODO: do we have no place to set default code & environment?
pass
def _set_headers_with_user_aml_token(self, kwargs) -> Dict[str, str]:
cloud_details = _get_cloud_details()
azure_ml_scopes = resource_to_scopes(cloud_details.get(ENDPOINT_URLS.AML_RESOURCE_ID))
module_logger.debug(f"azure_ml_scopes used: `{azure_ml_scopes}`\n")
aml_token = self._credential.get_token(*azure_ml_scopes).token
headers = kwargs.pop("headers", {})
headers["x-azureml-token"] = aml_token
kwargs["headers"] = headers