Source code for azure.synapse.artifacts.aio.operations._library_operations

# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from typing import Any, AsyncIterable, Callable, Dict, IO, Optional, TypeVar, Union, cast

from azure.core.async_paging import AsyncItemPaged, AsyncList
from azure.core.exceptions import (
    ClientAuthenticationError,
    HttpResponseError,
    ResourceExistsError,
    ResourceNotFoundError,
    ResourceNotModifiedError,
    map_error,
)
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import AsyncHttpResponse
from azure.core.polling import AsyncLROPoller, AsyncNoPolling, AsyncPollingMethod
from azure.core.polling.async_base_polling import AsyncLROBasePolling
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.utils import case_insensitive_dict

from ... import models as _models
from ..._vendor import _convert_request
from ...operations._library_operations import (
    build_append_request,
    build_create_request,
    build_delete_request,
    build_flush_request,
    build_get_operation_result_request,
    build_get_request,
    build_list_request,
)

T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]


[docs]class LibraryOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.synapse.artifacts.aio.ArtifactsClient`'s :attr:`library` attribute. """ models = _models def __init__(self, *args, **kwargs) -> None: input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def list(self, **kwargs: Any) -> AsyncIterable["_models.LibraryResource"]: """Lists Library. :keyword callable cls: A custom type or function that will be passed the direct response :return: An iterator like instance of either LibraryResource or the result of cls(response) :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.synapse.artifacts.models.LibraryResource] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop("api_version", _params.pop("api-version", "2020-12-01")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[_models.LibraryListResponse] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_list_request( api_version=api_version, template_url=self.list.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: request = HttpRequest("GET", next_link) request = _convert_request(request) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore request.method = "GET" return request async def extract_data(pipeline_response): deserialized = self._deserialize("LibraryListResponse", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) return deserialized.next_link or None, AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
list.metadata = {"url": "/libraries"} # type: ignore async def _flush_initial(self, library_name: str, **kwargs: Any) -> Optional[_models.LibraryResourceInfo]: error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop("api_version", _params.pop("api-version", "2020-12-01")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[Optional[_models.LibraryResourceInfo]] request = build_flush_request( library_name=library_name, api_version=api_version, template_url=self._flush_initial.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 202: deserialized = self._deserialize("LibraryResourceInfo", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized _flush_initial.metadata = {"url": "/libraries/{libraryName}/flush"} # type: ignore
[docs] @distributed_trace_async async def begin_flush(self, library_name: str, **kwargs: Any) -> AsyncLROPoller[None]: """Flush Library. :param library_name: file name to upload. Minimum length of the filename should be 1 excluding the extension length. Required. :type library_name: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of AsyncLROPoller that returns either LibraryResourceInfo or the result of cls(response) :rtype: ~azure.core.polling.AsyncLROPoller[~azure.synapse.artifacts.models.LibraryResourceInfo] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop("api_version", _params.pop("api-version", "2020-12-01")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[None] polling = kwargs.pop("polling", True) # type: Union[bool, AsyncPollingMethod] lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token = kwargs.pop("continuation_token", None) # type: Optional[str] if cont_token is None: raw_result = await self._flush_initial( # type: ignore library_name=library_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("LibraryResourceInfo", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } if polling is True: polling_method = cast( AsyncPollingMethod, AsyncLROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs), ) # type: AsyncPollingMethod elif polling is False: polling_method = cast(AsyncPollingMethod, AsyncNoPolling()) else: polling_method = polling if cont_token: return AsyncLROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_flush.metadata = {"url": "/libraries/{libraryName}/flush"} # type: ignore
[docs] @distributed_trace_async async def get_operation_result( self, operation_id: str, **kwargs: Any ) -> Union[_models.LibraryResource, _models.OperationResult]: """Get Operation result for Library. :param operation_id: operation id for which status is requested. Required. :type operation_id: str :keyword callable cls: A custom type or function that will be passed the direct response :return: LibraryResource or OperationResult or the result of cls(response) :rtype: ~azure.synapse.artifacts.models.LibraryResource or ~azure.synapse.artifacts.models.OperationResult :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop("api_version", _params.pop("api-version", "2020-12-01")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[Union[_models.LibraryResource, _models.OperationResult]] request = build_get_operation_result_request( operation_id=operation_id, api_version=api_version, template_url=self.get_operation_result.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: deserialized = self._deserialize("LibraryResource", pipeline_response) if response.status_code == 202: deserialized = self._deserialize("OperationResult", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get_operation_result.metadata = {"url": "/libraryOperationResults/{operationId}"} # type: ignore async def _delete_initial(self, library_name: str, **kwargs: Any) -> Optional[_models.LibraryResourceInfo]: error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop("api_version", _params.pop("api-version", "2020-12-01")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[Optional[_models.LibraryResourceInfo]] request = build_delete_request( library_name=library_name, api_version=api_version, template_url=self._delete_initial.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202, 409]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 202: deserialized = self._deserialize("LibraryResourceInfo", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized _delete_initial.metadata = {"url": "/libraries/{libraryName}"} # type: ignore
[docs] @distributed_trace_async async def begin_delete(self, library_name: str, **kwargs: Any) -> AsyncLROPoller[None]: """Delete Library. :param library_name: file name to upload. Minimum length of the filename should be 1 excluding the extension length. Required. :type library_name: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of AsyncLROPoller that returns either LibraryResourceInfo or the result of cls(response) :rtype: ~azure.core.polling.AsyncLROPoller[~azure.synapse.artifacts.models.LibraryResourceInfo] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop("api_version", _params.pop("api-version", "2020-12-01")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[None] polling = kwargs.pop("polling", True) # type: Union[bool, AsyncPollingMethod] lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token = kwargs.pop("continuation_token", None) # type: Optional[str] if cont_token is None: raw_result = await self._delete_initial( # type: ignore library_name=library_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("LibraryResourceInfo", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } if polling is True: polling_method = cast( AsyncPollingMethod, AsyncLROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs), ) # type: AsyncPollingMethod elif polling is False: polling_method = cast(AsyncPollingMethod, AsyncNoPolling()) else: polling_method = polling if cont_token: return AsyncLROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_delete.metadata = {"url": "/libraries/{libraryName}"} # type: ignore
[docs] @distributed_trace_async async def get(self, library_name: str, **kwargs: Any) -> Optional[_models.LibraryResource]: """Get Library. :param library_name: file name to upload. Minimum length of the filename should be 1 excluding the extension length. Required. :type library_name: str :keyword callable cls: A custom type or function that will be passed the direct response :return: LibraryResource or None or the result of cls(response) :rtype: ~azure.synapse.artifacts.models.LibraryResource or None :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop("api_version", _params.pop("api-version", "2020-12-01")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[Optional[_models.LibraryResource]] request = build_get_request( library_name=library_name, api_version=api_version, template_url=self.get.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 304]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: deserialized = self._deserialize("LibraryResource", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {"url": "/libraries/{libraryName}"} # type: ignore async def _create_initial(self, library_name: str, **kwargs: Any) -> Optional[_models.LibraryResourceInfo]: error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop("api_version", _params.pop("api-version", "2020-12-01")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[Optional[_models.LibraryResourceInfo]] request = build_create_request( library_name=library_name, api_version=api_version, template_url=self._create_initial.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 202: deserialized = self._deserialize("LibraryResourceInfo", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized _create_initial.metadata = {"url": "/libraries/{libraryName}"} # type: ignore
[docs] @distributed_trace_async async def begin_create(self, library_name: str, **kwargs: Any) -> AsyncLROPoller[None]: """Creates a library with the library name. :param library_name: file name to upload. Minimum length of the filename should be 1 excluding the extension length. Required. :type library_name: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of AsyncLROPoller that returns either LibraryResourceInfo or the result of cls(response) :rtype: ~azure.core.polling.AsyncLROPoller[~azure.synapse.artifacts.models.LibraryResourceInfo] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop("api_version", _params.pop("api-version", "2020-12-01")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[None] polling = kwargs.pop("polling", True) # type: Union[bool, AsyncPollingMethod] lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token = kwargs.pop("continuation_token", None) # type: Optional[str] if cont_token is None: raw_result = await self._create_initial( # type: ignore library_name=library_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("LibraryResourceInfo", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } if polling is True: polling_method = cast( AsyncPollingMethod, AsyncLROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs), ) # type: AsyncPollingMethod elif polling is False: polling_method = cast(AsyncPollingMethod, AsyncNoPolling()) else: polling_method = polling if cont_token: return AsyncLROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_create.metadata = {"url": "/libraries/{libraryName}"} # type: ignore
[docs] @distributed_trace_async async def append( # pylint: disable=inconsistent-return-statements self, comp: Union[str, "_models.Enum9"], library_name: str, content: IO, blob_condition_append_position: Optional[int] = None, **kwargs: Any ) -> None: """Append the content to the library resource created using the create operation. The maximum content size is 4MiB. Content larger than 4MiB must be appended in 4MiB chunks. :param comp: "appendblock" Required. :type comp: str or ~azure.synapse.artifacts.models.Enum9 :param library_name: file name to upload. Minimum length of the filename should be 1 excluding the extension length. Required. :type library_name: str :param content: Library file chunk. Required. :type content: IO :param blob_condition_append_position: Set this header to a byte offset at which the block is expected to be appended. The request succeeds only if the current offset matches this value. Otherwise, the request fails with the AppendPositionConditionNotMet error (HTTP status code 412 – Precondition Failed). Default value is None. :type blob_condition_append_position: int :keyword callable cls: A custom type or function that will be passed the direct response :return: None or the result of cls(response) :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop("api_version", _params.pop("api-version", "2020-12-01")) # type: str content_type = kwargs.pop("content_type", _headers.pop("Content-Type", "application/octet-stream")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[None] _content = content request = build_append_request( library_name=library_name, comp=comp, blob_condition_append_position=blob_condition_append_position, api_version=api_version, content_type=content_type, content=_content, template_url=self.append.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})
append.metadata = {"url": "/libraries/{libraryName}"} # type: ignore