# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from typing import TYPE_CHECKING
import warnings
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import HttpRequest, HttpResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.polling.base_polling import LROBasePolling
from .. import models as _models
if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
from typing import Any, Callable, Dict, Generic, IO, Iterable, Optional, TypeVar, Union
T = TypeVar('T')
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
[docs]class LibraryOperations(object):
"""LibraryOperations operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:ivar models: Alias to model classes used in this operation group.
:type models: ~azure.synapse.artifacts.models
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
models = _models
def __init__(self, client, config, serializer, deserializer):
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] def list(
self,
**kwargs # type: Any
):
# type: (...) -> Iterable["_models.LibraryListResponse"]
"""Lists Library.
:keyword callable cls: A custom type or function that will be passed the direct response
:return: An iterator like instance of either LibraryListResponse or the result of cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.synapse.artifacts.models.LibraryListResponse]
:raises: ~azure.core.exceptions.HttpResponseError
"""
cls = kwargs.pop('cls', None) # type: ClsType["_models.LibraryListResponse"]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
api_version = "2019-06-01-preview"
accept = "application/json"
def prepare_request(next_link=None):
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
if not next_link:
# Construct URL
url = self.list.metadata['url'] # type: ignore
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
url = self._client.format_url(url, **path_format_arguments)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
request = self._client.get(url, query_parameters, header_parameters)
else:
url = next_link
query_parameters = {} # type: Dict[str, Any]
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
url = self._client.format_url(url, **path_format_arguments)
request = self._client.get(url, query_parameters, header_parameters)
return request
def extract_data(pipeline_response):
deserialized = self._deserialize('LibraryListResponse', pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.next_link or None, iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
error = self._deserialize.failsafe_deserialize(_models.CloudError, response)
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, model=error)
return pipeline_response
return ItemPaged(
get_next, extract_data
)
list.metadata = {'url': '/libraries'} # type: ignore
def _flush_initial(
self,
library_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Optional["_models.LibraryResourceInfo"]
cls = kwargs.pop('cls', None) # type: ClsType[Optional["_models.LibraryResourceInfo"]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
api_version = "2019-06-01-preview"
accept = "application/json"
# Construct URL
url = self._flush_initial.metadata['url'] # type: ignore
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'libraryName': self._serialize.url("library_name", library_name, 'str', max_length=100, min_length=0),
}
url = self._client.format_url(url, **path_format_arguments)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
request = self._client.post(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.CloudError, response)
raise HttpResponseError(response=response, model=error)
deserialized = None
if response.status_code == 202:
deserialized = self._deserialize('LibraryResourceInfo', pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
_flush_initial.metadata = {'url': '/libraries/{libraryName}/flush'} # type: ignore
[docs] def begin_flush(
self,
library_name, # type: str
**kwargs # type: Any
):
# type: (...) -> LROPoller["_models.LibraryResourceInfo"]
"""Flush Library.
:param library_name: file name to upload. Minimum length of the filename should be 1 excluding
the extension length.
:type library_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be LROBasePolling.
Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod]
cls = kwargs.pop('cls', None) # type: ClsType["_models.LibraryResourceInfo"]
lro_delay = kwargs.pop(
'polling_interval',
self._config.polling_interval
)
cont_token = kwargs.pop('continuation_token', None) # type: Optional[str]
if cont_token is None:
raw_result = self._flush_initial(
library_name=library_name,
cls=lambda x,y,z: x,
**kwargs
)
kwargs.pop('error_map', None)
kwargs.pop('content_type', None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize('LibraryResourceInfo', pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'libraryName': self._serialize.url("library_name", library_name, 'str', max_length=100, min_length=0),
}
if polling is True: polling_method = LROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs)
elif polling is False: polling_method = NoPolling()
else: polling_method = polling
if cont_token:
return LROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output
)
else:
return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_flush.metadata = {'url': '/libraries/{libraryName}/flush'} # type: ignore
[docs] def get_operation_result(
self,
operation_id, # type: str
**kwargs # type: Any
):
# type: (...) -> Union["_models.LibraryResource", "_models.OperationResult"]
"""Get Operation result for Library.
:param operation_id: operation id for which status is requested.
:type operation_id: str
:keyword callable cls: A custom type or function that will be passed the direct response
:return: LibraryResource or OperationResult, or the result of cls(response)
:rtype: ~azure.synapse.artifacts.models.LibraryResource or ~azure.synapse.artifacts.models.OperationResult
:raises: ~azure.core.exceptions.HttpResponseError
"""
cls = kwargs.pop('cls', None) # type: ClsType[Union["_models.LibraryResource", "_models.OperationResult"]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
api_version = "2019-06-01-preview"
accept = "application/json"
# Construct URL
url = self.get_operation_result.metadata['url'] # type: ignore
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'operationId': self._serialize.url("operation_id", operation_id, 'str'),
}
url = self._client.format_url(url, **path_format_arguments)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
request = self._client.get(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.CloudError, response)
raise HttpResponseError(response=response, model=error)
if response.status_code == 200:
deserialized = self._deserialize('LibraryResource', pipeline_response)
if response.status_code == 202:
deserialized = self._deserialize('OperationResult', pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get_operation_result.metadata = {'url': '/libraryOperationResults/{operationId}'} # type: ignore
def _delete_initial(
self,
library_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Optional["_models.LibraryResourceInfo"]
cls = kwargs.pop('cls', None) # type: ClsType[Optional["_models.LibraryResourceInfo"]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
api_version = "2019-06-01-preview"
accept = "application/json"
# Construct URL
url = self._delete_initial.metadata['url'] # type: ignore
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'libraryName': self._serialize.url("library_name", library_name, 'str', max_length=100, min_length=0),
}
url = self._client.format_url(url, **path_format_arguments)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
request = self._client.delete(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 202, 409]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.CloudError, response)
raise HttpResponseError(response=response, model=error)
deserialized = None
if response.status_code == 202:
deserialized = self._deserialize('LibraryResourceInfo', pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
_delete_initial.metadata = {'url': '/libraries/{libraryName}'} # type: ignore
[docs] def begin_delete(
self,
library_name, # type: str
**kwargs # type: Any
):
# type: (...) -> LROPoller["_models.LibraryResourceInfo"]
"""Delete Library.
:param library_name: file name to upload. Minimum length of the filename should be 1 excluding
the extension length.
:type library_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be LROBasePolling.
Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod]
cls = kwargs.pop('cls', None) # type: ClsType["_models.LibraryResourceInfo"]
lro_delay = kwargs.pop(
'polling_interval',
self._config.polling_interval
)
cont_token = kwargs.pop('continuation_token', None) # type: Optional[str]
if cont_token is None:
raw_result = self._delete_initial(
library_name=library_name,
cls=lambda x,y,z: x,
**kwargs
)
kwargs.pop('error_map', None)
kwargs.pop('content_type', None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize('LibraryResourceInfo', pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'libraryName': self._serialize.url("library_name", library_name, 'str', max_length=100, min_length=0),
}
if polling is True: polling_method = LROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs)
elif polling is False: polling_method = NoPolling()
else: polling_method = polling
if cont_token:
return LROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output
)
else:
return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_delete.metadata = {'url': '/libraries/{libraryName}'} # type: ignore
[docs] def get(
self,
library_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Optional["_models.LibraryResource"]
"""Get Library.
:param library_name: file name to upload. Minimum length of the filename should be 1 excluding
the extension length.
:type library_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:return: LibraryResource, or the result of cls(response)
:rtype: ~azure.synapse.artifacts.models.LibraryResource or None
:raises: ~azure.core.exceptions.HttpResponseError
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional["_models.LibraryResource"]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
api_version = "2019-06-01-preview"
accept = "application/json"
# Construct URL
url = self.get.metadata['url'] # type: ignore
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'libraryName': self._serialize.url("library_name", library_name, 'str', max_length=100, min_length=0),
}
url = self._client.format_url(url, **path_format_arguments)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
request = self._client.get(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 304]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.CloudError, response)
raise HttpResponseError(response=response, model=error)
deserialized = None
if response.status_code == 200:
deserialized = self._deserialize('LibraryResource', pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/libraries/{libraryName}'} # type: ignore
def _create_initial(
self,
library_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Optional["_models.LibraryResourceInfo"]
cls = kwargs.pop('cls', None) # type: ClsType[Optional["_models.LibraryResourceInfo"]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
api_version = "2019-06-01-preview"
accept = "application/json"
# Construct URL
url = self._create_initial.metadata['url'] # type: ignore
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'libraryName': self._serialize.url("library_name", library_name, 'str', max_length=100, min_length=0),
}
url = self._client.format_url(url, **path_format_arguments)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
request = self._client.put(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.CloudError, response)
raise HttpResponseError(response=response, model=error)
deserialized = None
if response.status_code == 202:
deserialized = self._deserialize('LibraryResourceInfo', pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
_create_initial.metadata = {'url': '/libraries/{libraryName}'} # type: ignore
[docs] def begin_create(
self,
library_name, # type: str
**kwargs # type: Any
):
# type: (...) -> LROPoller["_models.LibraryResourceInfo"]
"""Creates a library with the library name.
:param library_name: file name to upload. Minimum length of the filename should be 1 excluding
the extension length.
:type library_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be LROBasePolling.
Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod]
cls = kwargs.pop('cls', None) # type: ClsType["_models.LibraryResourceInfo"]
lro_delay = kwargs.pop(
'polling_interval',
self._config.polling_interval
)
cont_token = kwargs.pop('continuation_token', None) # type: Optional[str]
if cont_token is None:
raw_result = self._create_initial(
library_name=library_name,
cls=lambda x,y,z: x,
**kwargs
)
kwargs.pop('error_map', None)
kwargs.pop('content_type', None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize('LibraryResourceInfo', pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'libraryName': self._serialize.url("library_name", library_name, 'str', max_length=100, min_length=0),
}
if polling is True: polling_method = LROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs)
elif polling is False: polling_method = NoPolling()
else: polling_method = polling
if cont_token:
return LROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output
)
else:
return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_create.metadata = {'url': '/libraries/{libraryName}'} # type: ignore
[docs] def append(
self,
library_name, # type: str
content, # type: IO
blob_condition_append_position=None, # type: Optional[int]
**kwargs # type: Any
):
# type: (...) -> None
"""Append the content to the library resource created using the create operation. The maximum
content size is 4MiB. Content larger than 4MiB must be appended in 4MiB chunks.
:param library_name: file name to upload. Minimum length of the filename should be 1 excluding
the extension length.
:type library_name: str
:param content: Library file chunk.
:type content: IO
:param blob_condition_append_position: Set this header to a byte offset at which the block is
expected to be appended. The request succeeds only if the current offset matches this value.
Otherwise, the request fails with the AppendPositionConditionNotMet error (HTTP status code 412
– Precondition Failed).
:type blob_condition_append_position: long
:keyword callable cls: A custom type or function that will be passed the direct response
:return: None, or the result of cls(response)
:rtype: None
:raises: ~azure.core.exceptions.HttpResponseError
"""
cls = kwargs.pop('cls', None) # type: ClsType[None]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
comp = "appendblock"
api_version = "2019-06-01-preview"
content_type = kwargs.pop("content_type", "application/octet-stream")
accept = "application/json"
# Construct URL
url = self.append.metadata['url'] # type: ignore
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'libraryName': self._serialize.url("library_name", library_name, 'str', max_length=100, min_length=0),
}
url = self._client.format_url(url, **path_format_arguments)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['comp'] = self._serialize.query("comp", comp, 'str')
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
# Construct headers
header_parameters = {} # type: Dict[str, Any]
if blob_condition_append_position is not None:
header_parameters['x-ms-blob-condition-appendpos'] = self._serialize.header("blob_condition_append_position", blob_condition_append_position, 'long')
header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
body_content_kwargs = {} # type: Dict[str, Any]
body_content_kwargs['stream_content'] = content
request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.CloudError, response)
raise HttpResponseError(response=response, model=error)
if cls:
return cls(pipeline_response, None, {})
append.metadata = {'url': '/libraries/{libraryName}'} # type: ignore