# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import functools
from typing import (
Union,
Any,
)
try:
from urllib.parse import urlparse, unquote
except ImportError:
from urlparse import urlparse # type: ignore
from urllib2 import unquote # type: ignore
from azure.core.async_paging import AsyncItemPaged
from azure.core.exceptions import ResourceNotFoundError, HttpResponseError
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from .._base_client import parse_connection_str
from .._entity import TableEntity
from .._generated.aio import AzureTable
from .._generated.models import SignedIdentifier, TableProperties, QueryOptions
from .._models import AccessPolicy
from .._serialize import serialize_iso
from .._deserialize import _return_headers_and_deserialized
from .._error import _process_table_error
from .._models import UpdateMode
from .._deserialize import _convert_to_entity, _trim_service_metadata
from .._serialize import _add_entity_properties, _get_match_headers
from .._table_client_base import TableClientBase
from ._base_client_async import AsyncStorageAccountHostsMixin
from ._models import TableEntityPropertiesPaged
from ._policies_async import ExponentialRetry
[docs]class TableClient(AsyncStorageAccountHostsMixin, TableClientBase):
""" :ivar str account_name: Name of the storage account (Cosmos or Azure)"""
def __init__(
self,
account_url, # type: str
table_name, # type: str
credential, # type : Optional[Any]=None
**kwargs # type: Any
):
# type: (...) -> None
"""Create TableClient from a Credential.
:param account_url:
A url to an Azure Storage account.
:type account_url: str
:param table_name: The table name.
:type table_name: str
:param credential:
The credentials with which to authenticate. This is optional if the
account URL already has a SAS token, or the connection string already has shared
access key values. The value can be a SAS token string, an account shared access
key.
:type credential: str
:returns: None
"""
kwargs["retry_policy"] = kwargs.get("retry_policy") or ExponentialRetry(**kwargs)
loop = kwargs.pop('loop', None)
super(TableClient, self).__init__(
account_url, table_name=table_name, credential=credential, loop=loop, **kwargs
)
self._client = AzureTable(self.url, pipeline=self._pipeline, loop=loop)
self._loop = loop
[docs] @classmethod
def from_connection_string(
cls, conn_str, # type: str
table_name, # type: str
**kwargs # type: Any
):
# type: (...) -> TableClient
"""Create TableClient from a Connection String.
:param conn_str:
A connection string to an Azure Storage or Cosmos account.
:type conn_str: str
:param table_name: The table name.
:type table_name: str
:returns: A table client.
:rtype: ~azure.data.tables.TableClient
.. admonition:: Example:
.. literalinclude:: ../samples/sample_create_client_async.py
:start-after: [START create_table_client]
:end-before: [END create_table_client]
:language: python
:dedent: 8
:caption: Creating the TableClient from a connection string.
"""
account_url, credential = parse_connection_str(
conn_str=conn_str, credential=None, service='table', keyword_args=kwargs)
return cls(account_url, table_name=table_name, credential=credential, **kwargs)
[docs] @classmethod
def from_table_url(cls, table_url, credential=None, **kwargs):
# type: (str, Optional[Any], Any) -> TableClient
"""A client to interact with a specific Table.
:param table_url: The full URI to the table, including SAS token if used.
:type table_url: str
:param credential:
The credentials with which to authenticate. This is optional if the
account URL already has a SAS token. The value can be a SAS token string, an account
shared access key.
:type credential: str
:returns: A table client.
:rtype: ~azure.data.tables.TableClient
"""
try:
if not table_url.lower().startswith('http'):
table_url = "https://" + table_url
except AttributeError:
raise ValueError("Table URL must be a string.")
parsed_url = urlparse(table_url.rstrip('/'))
if not parsed_url.netloc:
raise ValueError("Invalid URL: {}".format(table_url))
table_path = parsed_url.path.lstrip('/').split('/')
account_path = ""
if len(table_path) > 1:
account_path = "/" + "/".join(table_path[:-1])
account_url = "{}://{}{}?{}".format(
parsed_url.scheme,
parsed_url.netloc.rstrip('/'),
account_path,
parsed_url.query)
table_name = unquote(table_path[-1])
if not table_name:
raise ValueError("Invalid URL. Please provide a URL with a valid table name")
return cls(account_url, table_name=table_name, credential=credential, **kwargs)
[docs] @distributed_trace_async
async def get_table_access_policy(
self,
**kwargs # type: Any
):
# type: (...) -> dict[str,AccessPolicy]
"""
Retrieves details about any stored access policies specified on the table that may be
used with Shared Access Signatures.
:return: Dictionary of SignedIdentifiers
:rtype: dict[str,~azure.data.tables.AccessPolicy]
:raises: ~azure.core.exceptions.HttpResponseError
"""
timeout = kwargs.pop('timeout', None)
try:
_, identifiers = await self._client.table.get_access_policy(
table=self.table_name,
timeout=timeout,
cls=kwargs.pop('cls', None) or _return_headers_and_deserialized,
**kwargs)
except HttpResponseError as error:
_process_table_error(error)
return {s.id: s.access_policy or AccessPolicy() for s in identifiers}
[docs] @distributed_trace_async
async def set_table_access_policy(
self,
signed_identifiers, # type: dict[str,AccessPolicy]
**kwargs):
# type: (...) -> None
"""Sets stored access policies for the table that may be used with Shared Access Signatures.
:param signed_identifiers:
:type signed_identifiers: dict[str,AccessPolicy]
:return: None
:rtype: None
:raises: ~azure.core.exceptions.HttpResponseError
"""
self._validate_signed_identifiers(signed_identifiers)
identifiers = []
for key, value in signed_identifiers.items():
if value:
value.start = serialize_iso(value.start)
value.expiry = serialize_iso(value.expiry)
identifiers.append(SignedIdentifier(id=key, access_policy=value))
signed_identifiers = identifiers # type: ignore
try:
await self._client.table.set_access_policy(
table=self.table_name,
table_acl=signed_identifiers or None,
**kwargs)
except HttpResponseError as error:
_process_table_error(error)
[docs] @distributed_trace_async
async def create_table(
self,
**kwargs # type: Any
):
# type: (...) -> Dict[str,str]
"""Creates a new table under the given account.
:return: Dictionary of operation metadata returned from service
:rtype: dict[str,str]
:raises: ~azure.core.exceptions.HttpResponseError
.. admonition:: Example:
.. literalinclude:: ../samples/sample_create_delete_table_async.py
:start-after: [START create_table]
:end-before: [END create_table]
:language: python
:dedent: 8
:caption: Creating a table from the TableClient object.
"""
table_properties = TableProperties(table_name=self.table_name, **kwargs)
try:
metadata, _ = await self._client.table.create(
table_properties,
cls=kwargs.pop('cls', _return_headers_and_deserialized))
return _trim_service_metadata(metadata)
except HttpResponseError as error:
_process_table_error(error)
[docs] @distributed_trace_async
async def delete_table(
self,
**kwargs # type: Any
):
# type: (...) -> None
"""Creates a new table under the given account.
:return: None
:rtype: None
.. admonition:: Example:
.. literalinclude:: ../samples/sample_create_delete_table_async.py
:start-after: [START delete_table]
:end-before: [END delete_table]
:language: python
:dedent: 8
:caption: Deleting a table from the TableClient object.
"""
try:
await self._client.table.delete(table=self.table_name, **kwargs)
except HttpResponseError as error:
_process_table_error(error)
[docs] @distributed_trace_async
async def delete_entity(
self,
partition_key, # type: str
row_key, # type: str
**kwargs # type: Any
):
# type: (...) -> None
"""Deletes the specified entity in a table.
:param partition_key: The partition key of the entity.
:type partition_key: str
:param row_key: The row key of the entity.
:type row_key: str
:keyword str etag: Etag of the entity
:keyword ~azure.core.MatchConditions match_condition: MatchCondition
:return: None
:rtype: None
:raises: ~azure.core.exceptions.HttpResponseError
.. admonition:: Example:
.. literalinclude:: ../samples/sample_insert_delete_entities_async.py
:start-after: [START delete_entity]
:end-before: [END delete_entity]
:language: python
:dedent: 8
:caption: Adding an entity to a Table
"""
if_match, _ = _get_match_headers(kwargs=dict(kwargs, etag=kwargs.pop('etag', None),
match_condition=kwargs.pop('match_condition', None)),
etag_param='etag', match_param='match_condition')
try:
await self._client.table.delete_entity(
table=self.table_name,
partition_key=partition_key,
row_key=row_key,
if_match=if_match or '*',
**kwargs)
except HttpResponseError as error:
_process_table_error(error)
[docs] @distributed_trace_async
async def create_entity(
self,
entity, # type: Union[TableEntity, Dict[str,str]]
**kwargs # type: Any
):
# type: (...) -> Dict[str,str]
"""Insert entity in a table.
:param entity: The properties for the table entity.
:type entity: dict[str, str]
:return: Dictionary of operation metadata returned from service
:rtype: dict[str,str]
:raises: ~azure.core.exceptions.HttpResponseError
.. admonition:: Example:
.. literalinclude:: ../samples/sample_insert_delete_entities_async.py
:start-after: [START create_entity]
:end-before: [END create_entity]
:language: python
:dedent: 8
:caption: Adding an entity to a Table
"""
if "PartitionKey" in entity and "RowKey" in entity:
entity = _add_entity_properties(entity)
else:
raise ValueError('PartitionKey and RowKey were not provided in entity')
try:
metadata, _ = await self._client.table.insert_entity(
table=self.table_name,
table_entity_properties=entity,
cls=kwargs.pop('cls', _return_headers_and_deserialized),
**kwargs
)
return _trim_service_metadata(metadata)
except ResourceNotFoundError as error:
_process_table_error(error)
[docs] @distributed_trace_async
async def update_entity(
self,
entity, # type: Union[TableEntity, Dict[str,str]]
mode=UpdateMode.MERGE, # type: UpdateMode
**kwargs # type: Any
):
# type: (...) -> Dict[str,str]
"""Update entity in a table.
:param mode: Merge or Replace entity
:type mode: ~azure.data.tables.UpdateMode
:param entity: The properties for the table entity.
:type entity: dict[str, str]
:param partition_key: The partition key of the entity.
:type partition_key: str
:param row_key: The row key of the entity.
:type row_key: str
:param etag: Etag of the entity
:type etag: str
:param match_condition: MatchCondition
:type match_condition: ~azure.core.MatchConditions
:return: Dictionary of operation metadata returned from service
:rtype: dict[str,str]
:raises: ~azure.core.exceptions.HttpResponseError
.. admonition:: Example:
.. literalinclude:: ../samples/sample_update_upsert_merge_entities_async.py
:start-after: [START update_entity]
:end-before: [END update_entity]
:language: python
:dedent: 8
:caption: Querying entities from a TableClient
"""
if_match, _ = _get_match_headers(kwargs=dict(kwargs, etag=kwargs.pop('etag', None),
match_condition=kwargs.pop('match_condition', None)),
etag_param='etag', match_param='match_condition')
partition_key = entity['PartitionKey']
row_key = entity['RowKey']
entity = _add_entity_properties(entity)
try:
metadata = None
if mode is UpdateMode.REPLACE:
metadata, _ = await self._client.table.update_entity(
table=self.table_name,
partition_key=partition_key,
row_key=row_key,
table_entity_properties=entity,
if_match=if_match or "*",
cls=kwargs.pop('cls', _return_headers_and_deserialized),
**kwargs)
elif mode is UpdateMode.MERGE:
metadata, _ = await self._client.table.merge_entity(
table=self.table_name,
partition_key=partition_key,
row_key=row_key,
if_match=if_match or "*",
cls=kwargs.pop('cls', _return_headers_and_deserialized),
table_entity_properties=entity, **kwargs)
else:
raise ValueError('Mode type is not supported')
return _trim_service_metadata(metadata)
except HttpResponseError as error:
_process_table_error(error)
[docs] @distributed_trace
def list_entities(
self,
**kwargs # type: Any
):
# type: (...) -> AsyncItemPaged[TableEntity]
"""Lists entities in a table.
:keyword int results_per_page: Number of entities per page in return ItemPaged
:keyword Union[str, list(str)] select: Specify desired properties of an entity to return certain entities
:return: Query of table entities
:rtype: AsyncItemPaged[TableEntity]
:raises: ~azure.core.exceptions.HttpResponseError
.. admonition:: Example:
.. literalinclude:: ../samples/sample_update_upsert_merge_entities_async.py
:start-after: [START list_entities]
:end-before: [END list_entities]
:language: python
:dedent: 8
:caption: Querying entities from a TableClient
"""
user_select = kwargs.pop('select', None)
if user_select and not isinstance(user_select, str):
user_select = ", ".join(user_select)
query_options = QueryOptions(top=kwargs.pop('results_per_page', None), select=user_select)
command = functools.partial(
self._client.table.query_entities,
**kwargs)
return AsyncItemPaged(
command, results_per_page=query_options, table=self.table_name,
page_iterator_class=TableEntityPropertiesPaged
)
[docs] @distributed_trace
def query_entities(
self,
filter, # type: str # pylint: disable = W0622
**kwargs
):
# type: (...) -> AsyncItemPaged[TableEntity]
"""Lists entities in a table.
:param str filter: Specify a filter to return certain entities
:keyword int results_per_page: Number of entities per page in return ItemPaged
:keyword Union[str, list[str]] select: Specify desired properties of an entity to return certain entities
:keyword dict parameters: Dictionary for formatting query with additional, user defined parameters
:return: Query of table entities
:rtype: ItemPaged[TableEntity]
:raises: ~azure.core.exceptions.HttpResponseError
.. admonition:: Example:
.. literalinclude:: ../samples/sample_query_table_async.py
:start-after: [START query_entities]
:end-before: [END query_entities]
:language: python
:dedent: 8
:caption: Querying entities from a TableClient
"""
parameters = kwargs.pop('parameters', None)
filter = self._parameter_filter_substitution(parameters, filter) # pylint: disable = W0622
user_select = kwargs.pop('select', None)
if user_select and not isinstance(user_select, str):
user_select = ", ".join(user_select)
query_options = QueryOptions(top=kwargs.pop('results_per_page', None), select=user_select,
filter=filter)
command = functools.partial(
self._client.table.query_entities,
query_options=query_options,
**kwargs)
return AsyncItemPaged(
command, table=self.table_name,
page_iterator_class=TableEntityPropertiesPaged
)
[docs] @distributed_trace_async
async def get_entity(
self,
partition_key, # type: str
row_key, # type: str
**kwargs # type: Any
):
# type: (...) -> TableEntity
"""Queries entities in a table.
:param partition_key: The partition key of the entity.
:type partition_key: str
:param row_key: The row key of the entity.
:type row_key: str
:return: TableEntity mapping str to azure.data.tables.EntityProperty
:rtype: ~azure.data.tables.TableEntity
:raises: ~azure.core.exceptions.HttpResponseError
.. admonition:: Example:
.. literalinclude:: ../samples/sample_update_upsert_merge_entities.py
:start-after: [START get_entity]
:end-before: [END get_entity]
:language: python
:dedent: 8
:caption: Getting an entity from PartitionKey and RowKey
"""
try:
entity = await self._client.table.query_entities_with_partition_and_row_key(table=self.table_name,
partition_key=partition_key,
row_key=row_key,
**kwargs)
properties = _convert_to_entity(entity)
return properties
except HttpResponseError as error:
_process_table_error(error)
[docs] @distributed_trace_async
async def upsert_entity(
self,
entity, # type: Union[TableEntity, Dict[str,str]]
mode=UpdateMode.MERGE, # type: UpdateMode
**kwargs # type: Any
):
# type: (...) -> Dict[str,str]
"""Update/Merge or Insert entity into table.
:param mode: Merge or Replace and Insert on fail
:type mode: ~azure.data.tables.UpdateMode
:param entity: The properties for the table entity.
:type entity: dict[str, str]
:return: Dictionary of operation metadata returned from service
:rtype: dict[str,str]
:raises: ~azure.core.exceptions.HttpResponseError
.. admonition:: Example:
.. literalinclude:: ../samples/sample_update_upsert_merge_entities_async.py
:start-after: [START upsert_entity]
:end-before: [END upsert_entity]
:language: python
:dedent: 8
:caption: Update/Merge or Insert an entity into a table
"""
partition_key = entity['PartitionKey']
row_key = entity['RowKey']
entity = _add_entity_properties(entity)
try:
metadata = None
if mode is UpdateMode.MERGE:
metadata, _ = await self._client.table.merge_entity(
table=self.table_name,
partition_key=partition_key,
row_key=row_key,
table_entity_properties=entity,
cls=kwargs.pop('cls', _return_headers_and_deserialized),
**kwargs
)
elif mode is UpdateMode.REPLACE:
metadata, _ = await self._client.table.update_entity(
table=self.table_name,
partition_key=partition_key,
row_key=row_key,
table_entity_properties=entity,
cls=kwargs.pop('cls', _return_headers_and_deserialized),
**kwargs)
else:
raise ValueError("""Update mode {} is not supported.
For a list of supported modes see the UpdateMode enum""".format(mode))
return _trim_service_metadata(metadata)
except ResourceNotFoundError:
return await self.create_entity(
partition_key=partition_key,
row_key=row_key,
table_entity_properties=entity,
**kwargs
)