Source code for azure.ai.ml.operations._environment_operations

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging
from typing import Any, Iterable, Union

from azure.ai.ml._artifacts._artifact_utilities import _check_and_upload_env_build_context
from azure.ai.ml.constants import ARM_ID_PREFIX, AzureMLResourceType
from azure.ai.ml._scope_dependent_operations import _ScopeDependentOperations, OperationScope, OperationsContainer
from azure.ai.ml._restclient.v2022_02_01_preview.models import EnvironmentVersionData, ListViewType
from azure.ai.ml._restclient.v2022_05_01 import (
    AzureMachineLearningWorkspaces as ServiceClient052022,
)
from azure.ai.ml._restclient.v2021_10_01_dataplanepreview import (
    AzureMachineLearningWorkspaces as ServiceClient102021Dataplane,
)
from azure.ai.ml._utils._registry_utils import get_sas_uri_for_registry_asset, get_asset_body_for_registry_storage
from azure.ai.ml.entities._assets import Environment
from azure.ai.ml._utils._asset_utils import (
    _create_or_update_autoincrement,
    _get_latest,
    _resolve_label_to_asset,
    _archive_or_restore,
)
from azure.ai.ml._telemetry import AML_INTERNAL_LOGGER_NAMESPACE, ActivityType, monitor_with_activity
from azure.ai.ml._ml_exceptions import ErrorCategory, ErrorTarget, ValidationException

logger = logging.getLogger(AML_INTERNAL_LOGGER_NAMESPACE + __name__)
logger.propagate = False
module_logger = logging.getLogger(__name__)


[docs]class EnvironmentOperations(_ScopeDependentOperations): """ EnvironmentOperations You should not instantiate this class directly. Instead, you should create an MLClient instance that instantiates it for you and attaches it as an attribute. """ def __init__( self, operation_scope: OperationScope, service_client: Union[ServiceClient052022, ServiceClient102021Dataplane], all_operations: OperationsContainer, **kwargs: Any, ): super(EnvironmentOperations, self).__init__(operation_scope) if "app_insights_handler" in kwargs: logger.addHandler(kwargs.pop("app_insights_handler")) self._kwargs = kwargs self._containers_operations = service_client.environment_containers self._version_operations = service_client.environment_versions self._service_client = service_client self._all_operations = all_operations self._datastore_operation = all_operations.all_operations[AzureMLResourceType.DATASTORE] # Maps a label to a function which given an asset name, # returns the asset associated with the label self._managed_label_resolver = {"latest": self._get_latest_version}
[docs] @monitor_with_activity(logger, "Environment.CreateOrUpdate", ActivityType.PUBLICAPI) def create_or_update(self, environment: Environment) -> Environment: """Returns created or updated environment asset. :param environment: Environment object :type environment: Environment :return: Created or updated Environment object """ sas_uri = None if self._registry_name: sas_uri = get_sas_uri_for_registry_asset( service_client=self._service_client, name=environment.name, version=environment.version, resource_group=self._resource_group_name, registry=self._registry_name, body=get_asset_body_for_registry_storage( self._registry_name, "environments", environment.name, environment.version ), ) if not sas_uri: module_logger.debug( f"Getting the existing asset name: {environment.name}, version: {environment.version}" ) return self.get(name=environment.name, version=environment.version) environment = _check_and_upload_env_build_context(environment=environment, operations=self, sas_uri=sas_uri) env_version_resource = environment._to_rest_object() if environment._auto_increment_version: env_rest_obj = _create_or_update_autoincrement( name=environment.name, body=env_version_resource, version_operation=self._version_operations, container_operation=self._containers_operations, workspace_name=self._workspace_name, **self._scope_kwargs, **self._kwargs, ) else: env_rest_obj = ( self._version_operations.begin_create_or_update( name=environment.name, version=environment.version, registry_name=self._registry_name, body=env_version_resource, **self._scope_kwargs, **self._kwargs, ).result() if self._registry_name else self._version_operations.create_or_update( name=environment.name, version=environment.version, workspace_name=self._workspace_name, body=env_version_resource, **self._scope_kwargs, **self._kwargs, ) ) if not env_rest_obj and self._registry_name: env_rest_obj = self._get(name=environment.name, version=environment.version) return Environment._from_rest_object(env_rest_obj)
def _get(self, name: str, version: str = None) -> EnvironmentVersionData: if version: return ( self._version_operations.get( name=name, version=version, registry_name=self._registry_name, **self._scope_kwargs, **self._kwargs, ) if self._registry_name else self._version_operations.get( name=name, version=version, workspace_name=self._workspace_name, **self._scope_kwargs, **self._kwargs, ) ) else: return ( self._containers_operations.get( name=name, registry_name=self._registry_name, **self._scope_kwargs, **self._kwargs, ) if self._registry_name else self._containers_operations.get( name=name, workspace_name=self._workspace_name, **self._scope_kwargs, **self._kwargs, ) )
[docs] @monitor_with_activity(logger, "Environment.Get", ActivityType.PUBLICAPI) def get(self, name: str, version: str = None, label: str = None) -> Environment: """Returns the specified environment asset. :param name: Name of the environment. :type name: str :param version: Version of the environment. :type version: str :param label: Label of the environment. (mutually exclusive with version) :type label: str :return: Environment object """ if version and label: msg = "Cannot specify both version and label." raise ValidationException( message=msg, target=ErrorTarget.ENVIRONMENT, no_personal_data_message=msg, error_category=ErrorCategory.USER_ERROR, ) if label: return _resolve_label_to_asset(self, name, label) if not version: msg = "Must provide either version or label." raise ValidationException( message=msg, target=ErrorTarget.ENVIRONMENT, no_personal_data_message=msg, error_category=ErrorCategory.USER_ERROR, ) name = _preprocess_environment_name(name) env_version_resource = self._get(name, version) return Environment._from_rest_object(env_version_resource)
[docs] @monitor_with_activity(logger, "Environment.List", ActivityType.PUBLICAPI) def list( self, name: str = None, *, list_view_type: ListViewType = ListViewType.ACTIVE_ONLY ) -> Iterable[Environment]: """List all environment assets in workspace. :param name: Name of the environment. :type name: Optional[str] :param list_view_type: View type for including/excluding (for example) archived environments. Default: ACTIVE_ONLY. :type list_view_type: Optional[ListViewType] :return: An iterator like instance of Environment objects. :rtype: ~azure.core.paging.ItemPaged[Environment] """ if name: return ( self._version_operations.list( name=name, registry_name=self._registry_name, cls=lambda objs: [Environment._from_rest_object(obj) for obj in objs], **self._scope_kwargs, **self._kwargs, ) if self._registry_name else self._version_operations.list( name=name, workspace_name=self._workspace_name, cls=lambda objs: [Environment._from_rest_object(obj) for obj in objs], list_view_type=list_view_type, **self._scope_kwargs, **self._kwargs, ) ) else: return ( self._containers_operations.list( registry_name=self._registry_name, cls=lambda objs: [Environment._from_container_rest_object(obj) for obj in objs], **self._scope_kwargs, **self._kwargs, ) if self._registry_name else self._containers_operations.list( workspace_name=self._workspace_name, cls=lambda objs: [Environment._from_container_rest_object(obj) for obj in objs], list_view_type=list_view_type, **self._scope_kwargs, **self._kwargs, ) )
[docs] @monitor_with_activity(logger, "Environment.Delete", ActivityType.PUBLICAPI) def archive(self, name: str, version: str = None, label: str = None) -> None: """ Archive an environment or an environment version. :param name: Name of the environment. :type name: str :param version: Version of the environment. :type version: str :param label: Label of the environment. (mutually exclusive with version) :type label: str """ name = _preprocess_environment_name(name) _archive_or_restore( asset_operations=self, version_operation=self._version_operations, container_operation=self._containers_operations, is_archived=True, name=name, version=version, label=label, )
[docs] @monitor_with_activity(logger, "Environment.Restore", ActivityType.PUBLICAPI) def restore(self, name: str, version: str = None, label: str = None, **kwargs) -> None: """ Restore an archived environment version. :param name: Name of the environment. :type name: str :param version: Version of the environment. :type version: str :param label: Label of the environment. (mutually exclusive with version) :type label: str """ name = _preprocess_environment_name(name) _archive_or_restore( asset_operations=self, version_operation=self._version_operations, container_operation=self._containers_operations, is_archived=False, name=name, version=version, label=label, )
def _get_latest_version(self, name: str) -> Environment: """Returns the latest version of the asset with the given name. Latest is defined as the most recently created, not the most recently updated. """ result = _get_latest(name, self._version_operations, self._resource_group_name, self._workspace_name) return Environment._from_rest_object(result)
def _preprocess_environment_name(environment_name: str) -> str: if environment_name.startswith(ARM_ID_PREFIX): return environment_name[len(ARM_ID_PREFIX) :] return environment_name