Source code for azure.ai.ml.operations._data_operations

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import os
import logging
import requests
from pathlib import Path
from typing import Dict, List, Optional, Union
from azure.ai.ml.entities._data.mltable_metadata import MLTableMetadata
from azure.core.paging import ItemPaged

from azure.ai.ml.constants import AssetTypes, MLTABLE_SCHEMA_URL_FALLBACK
from azure.ai.ml.operations import DatastoreOperations
from azure.ai.ml._restclient.v2022_05_01 import (
    AzureMachineLearningWorkspaces as ServiceClient052022,
)
from azure.ai.ml._restclient.v2022_02_01_preview.models import ListViewType
from azure.ai.ml._artifacts._artifact_utilities import _check_and_upload_path
from azure.ai.ml._scope_dependent_operations import OperationScope, _ScopeDependentOperations
from azure.ai.ml.entities._assets import Data
from azure.ai.ml._artifacts._constants import (
    ASSET_PATH_ERROR,
    CHANGED_ASSET_PATH_MSG,
    CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA,
)
from azure.ai.ml._utils._asset_utils import (
    _create_or_update_autoincrement,
    _get_latest,
    _resolve_label_to_asset,
    _archive_or_restore,
)
from azure.ai.ml._utils._data_utils import (
    download_mltable_schema,
    read_local_mltable_metadata_contents,
    read_remote_mltable_metadata_contents,
    validate_mltable_metadata,
)
from azure.ai.ml._utils.utils import is_url
from azure.ai.ml._telemetry import AML_INTERNAL_LOGGER_NAMESPACE, ActivityType, monitor_with_activity
from azure.ai.ml._ml_exceptions import (
    DataException,
    ErrorCategory,
    ErrorTarget,
    ValidationException,
    AssetPathException,
)

logger = logging.getLogger(AML_INTERNAL_LOGGER_NAMESPACE + __name__)
logger.propagate = False
module_logger = logging.getLogger(__name__)


[docs]class DataOperations(_ScopeDependentOperations): def __init__( self, operation_scope: OperationScope, service_client: ServiceClient052022, datastore_operations: DatastoreOperations, **kwargs: Dict, ): super(DataOperations, self).__init__(operation_scope) if "app_insights_handler" in kwargs: logger.addHandler(kwargs.pop("app_insights_handler")) self._operation = service_client.data_versions self._container_operation = service_client.data_containers self._datastore_operation = datastore_operations self._init_kwargs = kwargs # Maps a label to a function which given an asset name, # returns the asset associated with the label self._managed_label_resolver = {"latest": self._get_latest_version}
[docs] @monitor_with_activity(logger, "Data.List", ActivityType.PUBLICAPI) def list( self, name: Optional[str] = None, *, list_view_type: ListViewType = ListViewType.ACTIVE_ONLY ) -> ItemPaged[Data]: """List the data assets of the workspace. :param name: Name of a specific data asset, optional. :type name: Optional[str] :param list_view_type: View type for including/excluding (for example) archived data assets. Default: ACTIVE_ONLY. :type list_view_type: Optional[ListViewType] :return: An iterator like instance of Data objects :rtype: ~azure.core.paging.ItemPaged[Data] """ if name: return self._operation.list( name=name, workspace_name=self._workspace_name, cls=lambda objs: [Data._from_rest_object(obj) for obj in objs], list_view_type=list_view_type, **self._scope_kwargs, ) else: return self._container_operation.list( workspace_name=self._workspace_name, cls=lambda objs: [Data._from_container_rest_object(obj) for obj in objs], list_view_type=list_view_type, **self._scope_kwargs, )
[docs] @monitor_with_activity(logger, "Data.Get", ActivityType.PUBLICAPI) def get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Data: """Get the specified data asset. :param name: Name of data asset. :type name: str :param version: Version of data asset. :type version: str :param label: Label of the data asset. (mutually exclusive with version) :type label: str :return: Data asset object. """ if version and label: msg = "Cannot specify both version and label." raise ValidationException( message=msg, target=ErrorTarget.DATA, no_personal_data_message=msg, error_category=ErrorCategory.USER_ERROR, ) if label: return _resolve_label_to_asset(self, name, label) if not version: msg = "Must provide either version or label." raise ValidationException( message=msg, target=ErrorTarget.DATA, no_personal_data_message=msg, error_category=ErrorCategory.USER_ERROR, ) data_version_resource = self._operation.get( resource_group_name=self._resource_group_name, workspace_name=self._workspace_name, name=name, version=version, **self._init_kwargs, ) return Data._from_rest_object(data_version_resource)
[docs] @monitor_with_activity(logger, "Data.CreateOrUpdate", ActivityType.PUBLICAPI) def create_or_update(self, data: Data) -> Data: """Returns created or updated data asset. If not already in storage, asset will be uploaded to the workspace's blob storage. :param data: Data asset object. :type data: Data :return: Data asset object. """ name = data.name version = data.version referenced_uris = self._validate(data) if referenced_uris: data._referenced_uris = referenced_uris data, _ = _check_and_upload_path(artifact=data, asset_operations=self) data_version_resource = data._to_rest_object() auto_increment_version = data._auto_increment_version try: if auto_increment_version: result = _create_or_update_autoincrement( name=data.name, body=data_version_resource, version_operation=self._operation, container_operation=self._container_operation, resource_group_name=self._operation_scope.resource_group_name, workspace_name=self._workspace_name, **self._init_kwargs, ) else: result = self._operation.create_or_update( name=name, version=version, workspace_name=self._workspace_name, body=data_version_resource, **self._scope_kwargs, ) except Exception as e: # service side raises an exception if we attempt to update an existing asset's asset path if str(e) == ASSET_PATH_ERROR: raise AssetPathException( message=CHANGED_ASSET_PATH_MSG, target=ErrorTarget.DATA, no_personal_data_message=CHANGED_ASSET_PATH_MSG_NO_PERSONAL_DATA, error_category=ErrorCategory.USER_ERROR, ) else: raise e return Data._from_rest_object(result)
@monitor_with_activity(logger, "Data.Validate", ActivityType.INTERNALCALL) def _validate(self, data: Data) -> Union[List[str], None]: if not data.path: msg = "Missing data path. Path is required for data." raise DataException( message=msg, no_personal_data_message=msg, target=ErrorTarget.DATA, error_category=ErrorCategory.USER_ERROR, ) asset_path = str(data.path) asset_type = data.type base_path = data.base_path if asset_type == AssetTypes.MLTABLE: if is_url(asset_path): try: metadata_contents = read_remote_mltable_metadata_contents( path=asset_path, datastore_operations=self._datastore_operation ) metadata_yaml_path = None except Exception: # skip validation for remote MLTable when the contents cannot be read logger.info("Unable to access MLTable metadata at path %s", asset_path, exc_info=1) return else: metadata_contents = read_local_mltable_metadata_contents(path=asset_path) metadata_yaml_path = Path(asset_path, "MLTable") metadata = MLTableMetadata._load(metadata_contents, metadata_yaml_path) mltable_metadata_schema = self._try_get_mltable_metadata_jsonschema(data._mltable_schema_url) if mltable_metadata_schema and not data._skip_validation: validate_mltable_metadata( mltable_metadata_dict=metadata_contents, mltable_schema=mltable_metadata_schema ) return metadata.referenced_uris() else: if is_url(asset_path): # skip validation for remote URI_FILE or URI_FOLDER return if os.path.isabs(asset_path): self._assert_local_path_matches_asset_type(asset_path, asset_type) else: abs_path = Path(base_path, asset_path).resolve() self._assert_local_path_matches_asset_type(abs_path, asset_type) def _try_get_mltable_metadata_jsonschema( self, mltable_schema_url: str = MLTABLE_SCHEMA_URL_FALLBACK ) -> Union[Dict, None]: try: return download_mltable_schema(mltable_schema_url) except Exception: logger.info( 'Failed to download MLTable jsonschema from "%s", skipping validation', mltable_schema_url, exc_info=1 ) return None def _assert_local_path_matches_asset_type( self, local_path: str, asset_type: Union[AssetTypes.URI_FILE, AssetTypes.URI_FOLDER] ) -> None: # assert file system type matches asset type if asset_type == AssetTypes.URI_FOLDER and not os.path.isdir(local_path): raise DataException( message="There is no dir on target path: {}".format(local_path), no_personal_data_message="There is no dir on target path", target=ErrorTarget.DATA, error_category=ErrorCategory.USER_ERROR, ) elif asset_type == AssetTypes.URI_FILE and not os.path.isfile(local_path): raise DataException( message="There is no file on target path: {}".format(local_path), no_personal_data_message="There is no file on target path", target=ErrorTarget.DATA, error_category=ErrorCategory.USER_ERROR, )
[docs] @monitor_with_activity(logger, "Data.Archive", ActivityType.PUBLICAPI) def archive(self, name: str, version: str = None, label: str = None) -> None: """Archive a data asset. :param name: Name of data asset. :type name: str :param version: Version of data asset. :type version: str :param label: Label of the data asset. (mutually exclusive with version) :type label: str :return: None """ _archive_or_restore( asset_operations=self, version_operation=self._operation, container_operation=self._container_operation, is_archived=True, name=name, version=version, label=label, )
[docs] @monitor_with_activity(logger, "Data.Restore", ActivityType.PUBLICAPI) def restore(self, name: str, version: str = None, label: str = None) -> None: """Restore an archived data asset. :param name: Name of data asset. :type name: str :param version: Version of data asset. :type version: str :param label: Label of the data asset. (mutually exclusive with version) :type label: str :return: None """ _archive_or_restore( asset_operations=self, version_operation=self._operation, container_operation=self._container_operation, is_archived=False, name=name, version=version, label=label, )
def _get_latest_version(self, name: str) -> Data: """Returns the latest version of the asset with the given name. Latest is defined as the most recently created, not the most recently updated. """ result = _get_latest(name, self._operation, self._resource_group_name, self._workspace_name) return Data._from_rest_object(result)