Source code for azure.ai.ml._ml_client

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import json
import logging
from functools import singledispatch
from itertools import product
from os import PathLike
from pathlib import Path
from typing import Optional, Tuple, Union
from azure.ai.ml._azure_environments import ENDPOINT_URLS, _get_cloud_details, _get_default_cloud_name, _set_cloud

from azure.identity import ChainedTokenCredential
from azure.core.polling import LROPoller
from azure.ai.ml.entities._builders.base_node import BaseNode

from azure.ai.ml.constants import AzureMLResourceType
from azure.ai.ml._file_utils.file_utils import traverse_up_path_and_find_file
from azure.ai.ml.operations import (
    BatchDeploymentOperations,
    BatchEndpointOperations,
    ComponentOperations,
    ComputeOperations,
    DataOperations,
    DatastoreOperations,
    EnvironmentOperations,
    JobOperations,
    ModelOperations,
    OnlineDeploymentOperations,
    OnlineEndpointOperations,
    WorkspaceOperations,
    WorkspaceConnectionsOperations,
)
from azure.ai.ml.operations._code_operations import CodeOperations
from azure.ai.ml.operations._dataset_operations import DatasetOperations
from azure.ai.ml.operations._local_deployment_helper import _LocalDeploymentHelper
from azure.ai.ml.operations._local_endpoint_helper import _LocalEndpointHelper

from azure.ai.ml._restclient.v2020_09_01_dataplanepreview import (
    AzureMachineLearningWorkspaces as ServiceClient092020DataplanePreview,
)
from azure.ai.ml._restclient.v2021_10_01 import AzureMachineLearningWorkspaces as ServiceClient102021
from azure.ai.ml._restclient.v2022_01_01_preview import AzureMachineLearningWorkspaces as ServiceClient012022Preview
from azure.ai.ml._restclient.v2022_02_01_preview import AzureMachineLearningWorkspaces as ServiceClient022022Preview
from azure.ai.ml._restclient.v2022_05_01 import AzureMachineLearningWorkspaces as ServiceClient052022
from azure.ai.ml._restclient.registry_discovery import AzureMachineLearningWorkspaces as ServiceClientRegistryDiscovery
from azure.ai.ml._user_agent import USER_AGENT
from azure.ai.ml._utils.utils import (
    _is_https_url,
    _get_mfe_base_url_from_registry_discovery_service,
)
from azure.ai.ml._utils._registry_utils import get_registry_service_client
from azure.ai.ml._scope_dependent_operations import OperationsContainer, OperationScope
from azure.ai.ml._ml_exceptions import ErrorTarget, ErrorCategory, ValidationException
from azure.ai.ml.entities import (
    BatchDeployment,
    BatchEndpoint,
    Component,
    Compute,
    Datastore,
    Environment,
    Job,
    Model,
    OnlineDeployment,
    OnlineEndpoint,
    Workspace,
)

from azure.ai.ml._telemetry.logging_handler import get_appinsights_log_handler

module_logger = logging.getLogger(__name__)


[docs]class MLClient(object): """A client class to interact with Azure ML services. Use this client to manage Azure ML resources, e.g. workspaces, jobs, models and so on. """ def __init__( self, credential: ChainedTokenCredential, subscription_id: str, resource_group_name: str, workspace_name: str = None, **kwargs, ): """Initiate Azure ML client :param credential: Credential to use for authentication. :type credential: ChainedTokenCredential :param subscription_id: Azure subscription ID. :type subscription_id: str :param resource_group_name: Azure resource group. :type resource_group_name: str :param workspace_name: Workspace to use in the client, optional for non workspace dependent operations., defaults to None :type workspace_name: str, optional """ cloud_name = kwargs.get("cloud_name", _get_default_cloud_name()) module_logger.debug("Cloud configured in MLClient: '%s'.", cloud_name) _set_cloud(cloud_name) self._registry_name = kwargs.pop("registry_name", None) self._operation_scope = OperationScope( subscription_id, resource_group_name, workspace_name, self._registry_name ) self._add_user_agent(kwargs) # Cannot send multiple base_url as azure-cli sets the base_url automatically. kwargs.pop("base_url", None) user_agent = None properties = {"subscription_id": subscription_id, "resource_group_name": resource_group_name} if workspace_name: properties.update({"workspace_name": workspace_name}) if "user_agent" in kwargs: user_agent = kwargs.get("user_agent") app_insights_handler = get_appinsights_log_handler(user_agent, **{"properties": properties}) app_insights_handler_kwargs = {"app_insights_handler": app_insights_handler} self._credential = credential cloud_details = _get_cloud_details(cloud_name) base_url = cloud_details.get(ENDPOINT_URLS.RESOURCE_MANAGER_ENDPOINT).strip("/") self._operation_container = OperationsContainer() self._rp_service_client = ServiceClient012022Preview( subscription_id=self._operation_scope._subscription_id, credential=self._credential, base_url=base_url, **kwargs, ) # kwargs related to operations alone not all kwargs passed to MLClient are needed by operations ops_kwargs = app_insights_handler_kwargs if base_url: ops_kwargs["enforce_https"] = _is_https_url(base_url) self._service_client_10_2021 = ServiceClient102021( subscription_id=self._operation_scope._subscription_id, credential=self._credential, base_url=base_url, **kwargs, ) self._service_client_09_2020_dataplanepreview = ServiceClient092020DataplanePreview( subscription_id=self._operation_scope._subscription_id, credential=self._credential, base_url=base_url, **kwargs, ) self._service_client_02_2022_preview = ServiceClient022022Preview( subscription_id=self._operation_scope._subscription_id, credential=self._credential, base_url=base_url, **kwargs, ) self._service_client_05_2022 = ServiceClient052022( credential=self._credential, subscription_id=self._operation_scope._subscription_id, base_url=base_url, **kwargs, ) self._workspaces = WorkspaceOperations( self._operation_scope, self._rp_service_client, self._operation_container, self._credential, base_url=base_url, **app_insights_handler_kwargs, ) if self._registry_name: base_url = _get_mfe_base_url_from_registry_discovery_service(self._workspaces, workspace_name) kwargs_registry = {**kwargs} kwargs_registry.pop("base_url", None) self._service_client_registry_discovery_client = ServiceClientRegistryDiscovery( credential=self._credential, base_url=base_url, **kwargs_registry ) self._service_client_10_2021_dataplanepreview = get_registry_service_client( subscription_id=self._operation_scope._subscription_id, registry_name=self._registry_name, credential=self._credential, service_client_registry_discovery_client=self._service_client_registry_discovery_client, **kwargs_registry, ) self._workspace_connections = WorkspaceConnectionsOperations( self._operation_scope, self._rp_service_client, self._operation_container, self._credential ) self._operation_container.add(AzureMLResourceType.WORKSPACE, self._workspaces) self._compute = ComputeOperations(self._operation_scope, self._rp_service_client, **app_insights_handler_kwargs) self._operation_container.add(AzureMLResourceType.COMPUTE, self._compute) self._datastores = DatastoreOperations( operation_scope=self._operation_scope, serviceclient_2022_05_01=self._service_client_05_2022, **ops_kwargs, ) self._operation_container.add(AzureMLResourceType.DATASTORE, self._datastores) self._models = ModelOperations( self._operation_scope, self._service_client_10_2021_dataplanepreview if self._registry_name else self._service_client_05_2022, self._datastores, **app_insights_handler_kwargs, ) self._operation_container.add(AzureMLResourceType.MODEL, self._models) self._code = CodeOperations( self._operation_scope, self._service_client_10_2021_dataplanepreview if self._registry_name else self._service_client_05_2022, self._datastores, **ops_kwargs, ) self._operation_container.add(AzureMLResourceType.CODE, self._code) self._environments = EnvironmentOperations( self._operation_scope, self._service_client_10_2021_dataplanepreview if self._registry_name else self._service_client_05_2022, self._operation_container, **ops_kwargs, ) self._operation_container.add(AzureMLResourceType.ENVIRONMENT, self._environments) self._local_endpoint_helper = _LocalEndpointHelper() self._local_deployment_helper = _LocalDeploymentHelper(self._operation_container) self._online_endpoints = OnlineEndpointOperations( self._operation_scope, self._service_client_02_2022_preview, self._operation_container, self._local_endpoint_helper, self._credential, **ops_kwargs, ) self._batch_endpoints = BatchEndpointOperations( self._operation_scope, self._service_client_05_2022, self._service_client_09_2020_dataplanepreview, self._operation_container, self._credential, **ops_kwargs, ) self._operation_container.add(AzureMLResourceType.BATCH_ENDPOINT, self._batch_endpoints) self._operation_container.add(AzureMLResourceType.ONLINE_ENDPOINT, self._online_endpoints) self._online_deployments = OnlineDeploymentOperations( self._operation_scope, self._service_client_02_2022_preview, self._operation_container, self._local_deployment_helper, self._credential, **ops_kwargs, ) self._batch_deployments = BatchDeploymentOperations( self._operation_scope, self._service_client_05_2022, self._service_client_09_2020_dataplanepreview, self._operation_container, self._credential, **ops_kwargs, ) self._operation_container.add(AzureMLResourceType.ONLINE_DEPLOYMENT, self._online_deployments) self._operation_container.add(AzureMLResourceType.BATCH_DEPLOYMENT, self._batch_deployments) self._data = DataOperations( self._operation_scope, self._service_client_05_2022, self._datastores, **ops_kwargs, ) self._operation_container.add(AzureMLResourceType.DATA, self._data) self._datasets = DatasetOperations( self._operation_scope, self._service_client_10_2021, self._datastores, **ops_kwargs ) self._operation_container.add(AzureMLResourceType.DATASET, self._datasets) self._components = ComponentOperations( self._operation_scope, self._service_client_10_2021_dataplanepreview if self._registry_name else self._service_client_05_2022, self._operation_container, **ops_kwargs, ) self._operation_container.add(AzureMLResourceType.COMPONENT, self._components) self._jobs = JobOperations( self._operation_scope, self._service_client_02_2022_preview, self._operation_container, self._credential, **ops_kwargs, ) self._operation_container.add(AzureMLResourceType.JOB, self._jobs)
[docs] @classmethod def from_config( cls, credential: ChainedTokenCredential, path: Union[PathLike, str] = None, _file_name=None, **kwargs ) -> "MLClient": """Return a workspace object from an existing Azure Machine Learning Workspace. Reads workspace configuration from a file. Throws an exception if the config file can't be found. The method provides a simple way to reuse the same workspace across multiple Python notebooks or projects. Users can save the workspace Azure Resource Manager (ARM) properties using the [workspace.write_config](https://docs.microsoft.com/python/api/azureml-core/azureml.core.workspace.workspace?view=azure-ml-py) method, and use this method to load the same workspace in different Python notebooks or projects without retyping the workspace ARM properties. :param credential: The credential object for the workspace. :type credential: azureml.core.authentication.ChainedTokenCredential :param path: The path to the config file or starting directory to search. The parameter defaults to starting the search in the current directory. :type path: str :param _file_name: Allows overriding the config file name to search for when path is a directory path. :type _file_name: str :param kwargs: A dictionary of additional configuration parameters. :type kwargs: dict :return: The workspace object for an existing Azure ML Workspace. :rtype: MLClient """ path = Path(".") if path is None else Path(path) if path.is_file(): found_path = path else: # Based on priority # Look in config dirs like .azureml, aml_config or plain directory # with None directories_to_look = [".azureml", "aml_config", None] if _file_name: files_to_look = [_file_name] else: files_to_look = ["config.json", "project.json"] found_path = None for curr_dir, curr_file in product(directories_to_look, files_to_look): module_logger.debug( "No config file directly found, starting search from {} " "directory, for {} file name to be present in " "{} subdirectory".format(path, curr_file, curr_dir) ) found_path = traverse_up_path_and_find_file( path=path, file_name=curr_file, directory_name=curr_dir, num_levels=5 ) if found_path: break if not found_path: msg = "We could not find config.json in: {} or in its parent directories. " "Please provide the full path to the config file or ensure that " "config.json exists in the parent directories." raise ValidationException( message=msg.format(path), no_personal_data_message=msg.format("[path]"), target=ErrorTarget.GENERAL, error_category=ErrorCategory.USER_ERROR, ) subscription_id, resource_group, workspace_name = MLClient._get_workspace_info(found_path) module_logger.info("Found the config file in: %s", found_path) return MLClient( credential=credential, subscription_id=subscription_id, resource_group_name=resource_group, workspace_name=workspace_name, **kwargs, )
""" This method provides a way to create MLClient object for cli to levarage cli context for authentication. With this we do not have to use AzureCliCredentials from azure-identity package (not meant for heavy usage). The credentials are passed by cli get_mgmt_service_client when it created a object of this class. """ @classmethod def _ml_client_cli(cls, credentials, subscription_id, **kwargs): ml_client = cls(credential=credentials, subscription_id=subscription_id, **kwargs) return ml_client @property def workspaces(self) -> WorkspaceOperations: """A collection of workspace related operations :return: Workspace operations :rtype: WorkspaceOperations """ return self._workspaces @property def connections(self) -> WorkspaceConnectionsOperations: """A collection of workspace connection related operations :return: Workspace Connections operations :rtype: WorkspaceConnectionsOperations """ return self._workspace_connections @property def jobs(self) -> JobOperations: """A collection of job related operations :return: Job operations :rtype: JObOperations """ return self._jobs @property def compute(self) -> ComputeOperations: """A collection of compute related operations :return: Compute operations :rtype: ComputeOperations """ return self._compute @property def models(self) -> ModelOperations: """A collection of model related operations :return: Model operations :rtype: ModelOperations """ return self._models @property def datasets(self) -> DatasetOperations: """A collection of dataset related operations :return: Dataset operations :rtype: DatasetOperations """ return self._datasets @property def online_endpoints(self) -> OnlineEndpointOperations: """A collection of online endpoint related operations :return: Online Endpoint operations :rtype: OnlineEndpointOperations """ return self._online_endpoints @property def batch_endpoints(self) -> BatchEndpointOperations: """A collection of batch endpoint related operations :return: Batch Endpoint operations :rtype: BatchEndpointOperations """ return self._batch_endpoints @property def online_deployments(self) -> OnlineDeploymentOperations: """A collection of online deployment related operations :return: Online Deployment operations :rtype: OnlineDeploymentOperations """ return self._online_deployments @property def batch_deployments(self) -> BatchDeploymentOperations: """A collection of batch deployment related operations :return: Batch Deployment operations :rtype: BatchDeploymentOperations """ return self._batch_deployments @property def datastores(self) -> DatastoreOperations: """A collection of datastore related operations :return: Datastore operations :rtype: DatastoreOperations """ return self._datastores @property def environments(self) -> EnvironmentOperations: """A collection of environment related operations :return: Environment operations :rtype: EnvironmentOperations """ return self._environments @property def data(self) -> DataOperations: """A collection of data related operations :return: Data operations :rtype: DataOperations """ return self._data @property def components(self) -> ComponentOperations: """A collection of component related operations :return: Component operations :rtype: ComponentOperations """ return self._components @property def workspace_name(self) -> Optional[str]: """The workspace where workspace dependent operations will be executed in. :return: Default workspace name :rtype: Optional[str] """ return self._operation_scope.workspace_name def _get_new_client(self, workspace_name: str) -> "MLClient": """Returns a new MLClient object with the specified arguments :param str workspace_name: AzureML workspace of the new MLClient """ return MLClient( credential=self._credential, subscription_id=self._operation_scope.subscription_id, resource_group_name=self._operation_scope.resource_group_name, workspace_name=workspace_name, ) @classmethod def _get_workspace_info(cls, found_path: str) -> Tuple[str, str, str]: with open(found_path, "r") as config_file: config = json.load(config_file) # Checking the keys in the config.json file to check for required parameters. scope = config.get("Scope") if not scope: if not all([k in config.keys() for k in ("subscription_id", "resource_group", "workspace_name")]): msg = "The config file found in: {} does not seem to contain the required " "parameters. Please make sure it contains your subscription_id, " "resource_group and workspace_name." raise ValidationException( message=msg.format(found_path), no_personal_data_message=msg.format("[found_path]"), target=ErrorTarget.GENERAL, error_category=ErrorCategory.USER_ERROR, ) # User provided ARM parameters take precedence over values from config.json subscription_id_from_config = config["subscription_id"] resource_group_from_config = config["resource_group"] workspace_name_from_config = config["workspace_name"] else: pieces = scope.split("/") # User provided ARM parameters take precedence over values from config.json subscription_id_from_config = pieces[2] resource_group_from_config = pieces[4] workspace_name_from_config = pieces[8] return ( subscription_id_from_config, resource_group_from_config, workspace_name_from_config, ) def _add_user_agent(self, kwargs) -> None: user_agent = kwargs.pop("user_agent", None) user_agent = f"{user_agent} {USER_AGENT}" if user_agent else USER_AGENT kwargs.setdefault("user_agent", user_agent)
[docs] def create_or_update( self, entity: Union[Job, BaseNode, Model, Environment, Component, Datastore], **kwargs ) -> Union[Job, Model, Environment, Component, Datastore]: """Creates or updates an Azure ML resource. :param entity: The resource to create or update. :type entity: Union[azure.ai.ml.entities.Job, azure.ai.ml.entities.Model, azure.ai.ml.entities.Environment, azure.ai.ml.entities.Component, azure.ai.ml.entities.Datastore] :return: The created or updated resource :rtype: Union[azure.ai.ml.entities.Job, azure.ai.ml.entities.Model, azure.ai.ml.entities.Environment, azure.ai.ml.entities.Component, azure.ai.ml.entities.Datastore] """ return _create_or_update(entity, self._operation_container.all_operations, **kwargs)
[docs] def begin_create_or_update( self, entity: Union[Workspace, Compute, OnlineDeployment, OnlineEndpoint, BatchDeployment, BatchEndpoint], **kwargs, ) -> LROPoller: """Creates or updates an Azure ML resource asynchronously. :param entity: The resource to create or update. :type entity: Union[azure.ai.ml.entities.Workspace, azure.ai.ml.entities.Compute, azure.ai.ml.entities.OnlineDeployment, azure.ai.ml.entities.OnlineEndpoint, azure.ai.ml.entities.BatchDeployment, azure.ai.ml.entities.BatchEndpoint] :return: The resource after create/update operation :rtype: Optional[Union[azure.ai.ml.entities.Workspace, azure.ai.ml.entities.Compute, azure.ai.ml.entities.OnlineDeployment, azure.ai.ml.entities.OnlineEndpoint, azure.ai.ml.entities.BatchDeployment, azure.ai.ml.entities.BatchEndpoint]] """ return _begin_create_or_update(entity, self._operation_container.all_operations, **kwargs)
def __repr__(self) -> str: return f"""MLClient(credential={self._credential}, subscription_id={self._operation_scope.subscription_id}, resource_group_name={self._operation_scope.resource_group_name}, workspace_name={self.workspace_name})"""
@singledispatch def _create_or_update(entity, operations, **kwargs): raise NotImplementedError() @_create_or_update.register(Job) def _(entity: Job, operations, **kwargs): module_logger.debug("Creating or updating job") return operations[AzureMLResourceType.JOB].create_or_update(entity, **kwargs) @_create_or_update.register(BaseNode) def _(entity: Job, operations): module_logger.debug("Creating or updating job") return operations[AzureMLResourceType.JOB].create_or_update(entity) @_create_or_update.register(Model) def _(entity: Model, operations): module_logger.debug("Creating or updating model") return operations[AzureMLResourceType.MODEL].create_or_update(entity) @_create_or_update.register(Environment) def _(entity: Environment, operations): module_logger.debug("Creating or updating environment") return operations[AzureMLResourceType.ENVIRONMENT].create_or_update(entity) @_create_or_update.register(Component) def _(entity: Component, operations, **kwargs): module_logger.debug("Creating or updating components") return operations[AzureMLResourceType.COMPONENT].create_or_update(entity, **kwargs) @_create_or_update.register(Datastore) def _(entity: Datastore, operations): module_logger.debug("Creating or updating datastores") return operations[AzureMLResourceType.DATASTORE].create_or_update(entity) @singledispatch def _begin_create_or_update(entity, operations, **kwargs): raise NotImplementedError() @_begin_create_or_update.register(Workspace) def _(entity: Workspace, operations, *args, **kwargs): module_logger.debug("Creating or updating workspaces") return operations[AzureMLResourceType.WORKSPACE].begin_create(entity, **kwargs) @_begin_create_or_update.register(Compute) def _(entity: Compute, operations, *args, **kwargs): module_logger.debug("Creating or updating compute") return operations[AzureMLResourceType.COMPUTE].begin_create_or_update(entity, **kwargs) @_begin_create_or_update.register(OnlineEndpoint) def _(entity: OnlineEndpoint, operations, *args, **kwargs): module_logger.debug("Creating or updating online_endpoints") return operations[AzureMLResourceType.ONLINE_ENDPOINT].begin_create_or_update(entity, **kwargs) @_begin_create_or_update.register(BatchEndpoint) def _(entity: BatchEndpoint, operations, *args, **kwargs): module_logger.debug("Creating or updating batch_endpoints") return operations[AzureMLResourceType.BATCH_ENDPOINT].begin_create_or_update(entity, **kwargs) @_begin_create_or_update.register(OnlineDeployment) def _(entity: OnlineDeployment, operations, *args, **kwargs): module_logger.debug("Creating or updating online_deployments") return operations[AzureMLResourceType.ONLINE_DEPLOYMENT].begin_create_or_update(entity, **kwargs) @_begin_create_or_update.register(BatchDeployment) def _(entity: BatchDeployment, operations, *args, **kwargs): module_logger.debug("Creating or updating batch_deployments") return operations[AzureMLResourceType.BATCH_DEPLOYMENT].begin_create_or_update(entity, **kwargs)