Source code for azure.data.tables._table_client

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

import functools
from typing import Optional, Any, Union  # pylint: disable = W0611

try:
    from urllib.parse import urlparse, unquote
except ImportError:
    from urlparse import urlparse  # type: ignore
    from urllib2 import unquote  # type: ignore

from azure.core.paging import ItemPaged
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
from azure.core.tracing.decorator import distributed_trace

from ._deserialize import _convert_to_entity, _trim_service_metadata
from ._entity import TableEntity
from ._generated import AzureTable
from ._generated.models import (
    AccessPolicy,
    SignedIdentifier,
    TableProperties,
    QueryOptions
)
from ._serialize import _get_match_headers, _add_entity_properties
from ._base_client import parse_connection_str
from ._table_client_base import TableClientBase
from ._serialize import serialize_iso
from ._deserialize import _return_headers_and_deserialized
from ._error import _process_table_error
from ._models import TableEntityPropertiesPaged, UpdateMode


[docs]class TableClient(TableClientBase): """ :ivar str account_name: Name of the storage account (Cosmos or Azure)""" def __init__( self, account_url, # type: str table_name, # type: str credential=None, # type: Union[str,TokenCredential] **kwargs # type: Any ): # type: (...) -> None """Create TableClient from a Credential. :param account_url: A url to an Azure Storage account. :type account_url: str :param table_name: The table name. :type table_name: str :param credential: The credentials with which to authenticate. This is optional if the account URL already has a SAS token, or the connection string already has shared access key values. The value can be a SAS token string, an account shared access key, or an instance of a TokenCredentials class from azure.identity. :type credential: Union[str,TokenCredential] :returns: None """ super(TableClient, self).__init__(account_url, table_name, credential=credential, **kwargs) self._client = AzureTable(self.url, pipeline=self._pipeline)
[docs] @classmethod def from_connection_string( cls, conn_str, # type: str table_name, # type: str **kwargs # type: Any ): # type: (...) -> TableClient """Create TableClient from a Connection String. :param conn_str: A connection string to an Azure Storage or Cosmos account. :type conn_str: str :param table_name: The table name. :type table_name: str :returns: A table client. :rtype: ~azure.data.tables.TableClient .. admonition:: Example: .. literalinclude:: ../samples/sample_create_client.py :start-after: [START create_table_client] :end-before: [END create_table_client] :language: python :dedent: 8 :caption: Authenticating a TableServiceClient from a connection_string """ account_url, credential = parse_connection_str( conn_str=conn_str, credential=None, service='table', keyword_args=kwargs) return cls(account_url, table_name=table_name, credential=credential, **kwargs)
[docs] @classmethod def from_table_url(cls, table_url, credential=None, **kwargs): # type: (str, Optional[Any], Any) -> TableClient """A client to interact with a specific Table. :param table_url: The full URI to the table, including SAS token if used. :type table_url: str :param credential: The credentials with which to authenticate. This is optional if the account URL already has a SAS token. The value can be a SAS token string, an account shared access key. :type credential: str :returns: A table client. :rtype: ~azure.data.tables.TableClient """ try: if not table_url.lower().startswith('http'): table_url = "https://" + table_url except AttributeError: raise ValueError("Table URL must be a string.") parsed_url = urlparse(table_url.rstrip('/')) if not parsed_url.netloc: raise ValueError("Invalid URL: {}".format(table_url)) table_path = parsed_url.path.lstrip('/').split('/') account_path = "" if len(table_path) > 1: account_path = "/" + "/".join(table_path[:-1]) account_url = "{}://{}{}?{}".format( parsed_url.scheme, parsed_url.netloc.rstrip('/'), account_path, parsed_url.query) table_name = unquote(table_path[-1]) if not table_name: raise ValueError("Invalid URL. Please provide a URL with a valid table name") return cls(account_url, table_name=table_name, credential=credential, **kwargs)
[docs] @distributed_trace def get_table_access_policy( self, **kwargs # type: Any ): # type: (...) -> Dict[str,AccessPolicy] """Retrieves details about any stored access policies specified on the table that may be used with Shared Access Signatures. :return: Dictionary of SignedIdentifiers :rtype: dict[str,AccessPolicy] :raises: ~azure.core.exceptions.HttpResponseError """ timeout = kwargs.pop('timeout', None) try: _, identifiers = self._client.table.get_access_policy( table=self.table_name, timeout=timeout, cls=kwargs.pop('cls', None) or _return_headers_and_deserialized, **kwargs) except HttpResponseError as error: _process_table_error(error) return {s.id: s.access_policy or AccessPolicy() for s in identifiers} # pylint: disable=E1125
[docs] @distributed_trace def set_table_access_policy( self, signed_identifiers, # type: Dict[str,AccessPolicy] **kwargs): # type: (...) -> None """Sets stored access policies for the table that may be used with Shared Access Signatures. :param signed_identifiers: :type signed_identifiers: dict[str,AccessPolicy] :return: None :rtype: None :raises: ~azure.core.exceptions.HttpResponseError """ self._validate_signed_identifiers(signed_identifiers) identifiers = [] for key, value in signed_identifiers.items(): if value: value.start = serialize_iso(value.start) value.expiry = serialize_iso(value.expiry) identifiers.append(SignedIdentifier(id=key, access_policy=value)) signed_identifiers = identifiers # type: ignore try: self._client.table.set_access_policy( table=self.table_name, table_acl=signed_identifiers or None, **kwargs) except HttpResponseError as error: _process_table_error(error)
[docs] @distributed_trace def create_table( self, **kwargs # type: Any ): # type: (...) -> Dict[str,str] """Creates a new table under the current account. :return: Dictionary of operation metadata returned from service :rtype: dict[str,str] :raises: ~azure.core.exceptions.HttpResponseError .. admonition:: Example: .. literalinclude:: ../samples/sample_create_delete_table.py :start-after: [START create_table_from_table_client] :end-before: [END create_table_from_table_client] :language: python :dedent: 8 :caption: Creating a table from the TableClient object """ table_properties = TableProperties(table_name=self.table_name, **kwargs) try: metadata, _ = self._client.table.create( table_properties, cls=kwargs.pop('cls', _return_headers_and_deserialized)) return _trim_service_metadata(metadata) except HttpResponseError as error: _process_table_error(error)
[docs] @distributed_trace def delete_table( self, **kwargs # type: Any ): # type: (...) -> None """Deletes the table under the current account. :return: None :rtype: None .. admonition:: Example: .. literalinclude:: ../samples/sample_create_delete_table.py :start-after: [START delete_table_from_table_client] :end-before: [END delete_table_from_table_client] :language: python :dedent: 8 :caption: Deleting a table from the TableClient object """ try: self._client.table.delete(table=self.table_name, **kwargs) except HttpResponseError as error: _process_table_error(error)
[docs] @distributed_trace def delete_entity( self, partition_key, # type: str row_key, # type: str **kwargs # type: Any ): # type: (...) -> None """Deletes the specified entity in a table. :param partition_key: The partition key of the entity. :type partition_key: str :param row_key: The row key of the entity. :type row_key: str :keyword str etag: Etag of the entity :keyword ~azure.core.MatchConditions match_condition: MatchCondition :return: None :rtype: None :raises: ~azure.core.exceptions.HttpResponseError .. admonition:: Example: .. literalinclude:: ../samples/sample_insert_delete_entities.py :start-after: [START delete_entity] :end-before: [END delete_entity] :language: python :dedent: 8 :caption: Deleting an entity to a Table """ if_match, _ = _get_match_headers(kwargs=dict(kwargs, etag=kwargs.pop('etag', None), match_condition=kwargs.pop('match_condition', None)), etag_param='etag', match_param='match_condition') try: self._client.table.delete_entity( table=self.table_name, partition_key=partition_key, row_key=row_key, if_match=if_match or '*', **kwargs) except HttpResponseError as error: _process_table_error(error)
[docs] @distributed_trace def create_entity( self, entity, # type: Union[TableEntity, Dict[str,str]] **kwargs # type: Any ): # type: (...) -> Dict[str,str] """Insert entity in a table. :param entity: The properties for the table entity. :type entity: Union[TableEntity, dict[str,str]] :return: Dictionary mapping operation metadata returned from the service :rtype: dict[str,str] :raises: ~azure.core.exceptions.HttpResponseError .. admonition:: Example: .. literalinclude:: ../samples/sample_insert_delete_entities.py :start-after: [START create_entity] :end-before: [END create_entity] :language: python :dedent: 8 :caption: Creating and adding an entity to a Table """ if "PartitionKey" in entity and "RowKey" in entity: entity = _add_entity_properties(entity) else: raise ValueError('PartitionKey and RowKey were not provided in entity') try: metadata, _ = self._client.table.insert_entity( table=self.table_name, table_entity_properties=entity, cls=kwargs.pop('cls', _return_headers_and_deserialized), **kwargs) return _trim_service_metadata(metadata) except ResourceNotFoundError as error: _process_table_error(error)
[docs] @distributed_trace def update_entity( # pylint:disable=R1710 self, entity, # type: Union[TableEntity, Dict[str,str]] mode=UpdateMode.MERGE, # type: UpdateMode **kwargs # type: Any ): # type: (...) -> Dict[str,str] """Update entity in a table. :param entity: The properties for the table entity. :type entity: Union[TableEntity, dict[str,str]] :param mode: Merge or Replace entity :type mode: ~azure.data.tables.UpdateMode :keyword str partition_key: The partition key of the entity. :keyword str row_key: The row key of the entity. :keyword str etag: Etag of the entity :keyword ~azure.core.MatchConditions match_condition: MatchCondition :return: Dictionary mapping operation metadata returned from the service :rtype: dict[str,str] :raises: ~azure.core.exceptions.HttpResponseError .. admonition:: Example: .. literalinclude:: ../samples/sample_update_upsert_merge_entities.py :start-after: [START update_entity] :end-before: [END update_entity] :language: python :dedent: 8 :caption: Updating an already exiting entity in a Table """ if_match, _ = _get_match_headers(kwargs=dict(kwargs, etag=kwargs.pop('etag', None), match_condition=kwargs.pop('match_condition', None)), etag_param='etag', match_param='match_condition') partition_key = entity['PartitionKey'] row_key = entity['RowKey'] entity = _add_entity_properties(entity) try: metadata = None if mode is UpdateMode.REPLACE: metadata, _ = self._client.table.update_entity( table=self.table_name, partition_key=partition_key, row_key=row_key, table_entity_properties=entity, if_match=if_match or "*", cls=kwargs.pop('cls', _return_headers_and_deserialized), **kwargs) elif mode is UpdateMode.MERGE: metadata, _ = self._client.table.merge_entity( table=self.table_name, partition_key=partition_key, row_key=row_key, if_match=if_match or "*", table_entity_properties=entity, cls=kwargs.pop('cls', _return_headers_and_deserialized), **kwargs) else: raise ValueError('Mode type is not supported') return _trim_service_metadata(metadata) except HttpResponseError as error: _process_table_error(error)
[docs] @distributed_trace def list_entities( self, **kwargs # type: Any ): # type: (...) -> ItemPaged[TableEntity] """Lists entities in a table. :keyword int results_per_page: Number of entities per page in return ItemPaged :keyword Union[str, list(str)] select: Specify desired properties of an entity to return certain entities :return: Query of table entities :rtype: ItemPaged[TableEntity] :raises: ~azure.core.exceptions.HttpResponseError .. admonition:: Example: .. literalinclude:: ../samples/sample_update_upsert_merge_entities.py :start-after: [START query_entities] :end-before: [END query_entities] :language: python :dedent: 8 :caption: List all entities held within a table """ user_select = kwargs.pop('select', None) if user_select and not isinstance(user_select, str): user_select = ", ".join(user_select) query_options = QueryOptions(top=kwargs.pop('results_per_page', None), select=user_select) command = functools.partial( self._client.table.query_entities, **kwargs) return ItemPaged( command, results_per_page=query_options, table=self.table_name, page_iterator_class=TableEntityPropertiesPaged )
[docs] @distributed_trace def query_entities( self, filter, # type: str # pylint: disable = W0622 **kwargs ): # type: (...) -> ItemPaged[TableEntity] """Lists entities in a table. :param str filter: Specify a filter to return certain entities :keyword int results_per_page: Number of entities per page in return ItemPaged :keyword Union[str, list[str]] select: Specify desired properties of an entity to return certain entities :keyword dict parameters: Dictionary for formatting query with additional, user defined parameters :return: Query of table entities :rtype: ItemPaged[TableEntity] :raises: ~azure.core.exceptions.HttpResponseError .. admonition:: Example: .. literalinclude:: ../samples/sample_query_table.py :start-after: [START query_entities] :end-before: [END query_entities] :language: python :dedent: 8 :caption: Query entities held within a table """ parameters = kwargs.pop('parameters', None) filter = self._parameter_filter_substitution(parameters, filter) # pylint: disable = W0622 user_select = kwargs.pop('select', None) if user_select and not isinstance(user_select, str): user_select = ", ".join(user_select) query_options = QueryOptions(top=kwargs.pop('results_per_page', None), select=user_select, filter=filter) command = functools.partial( self._client.table.query_entities, query_options=query_options, **kwargs) return ItemPaged( command, table=self.table_name, page_iterator_class=TableEntityPropertiesPaged )
[docs] @distributed_trace def get_entity( self, partition_key, # type: str row_key, # type: str **kwargs # type: Any ): # type: (...) -> Dict[str,str] """Queries entities in a table. :param partition_key: The partition key of the entity. :type partition_key: str :param row_key: The row key of the entity. :type row_key: str :return: Dictionary mapping operation metadata returned from the service :rtype: dict[str,str] :raises: ~azure.core.exceptions.HttpResponseError """ try: entity = self._client.table.query_entities_with_partition_and_row_key(table=self.table_name, partition_key=partition_key, row_key=row_key, **kwargs) properties = _convert_to_entity(entity) return properties except HttpResponseError as error: _process_table_error(error)
[docs] @distributed_trace def upsert_entity( # pylint:disable=R1710 self, entity, # type: Union[TableEntity, Dict[str,str]] mode=UpdateMode.MERGE, # type: UpdateMode **kwargs # type: Any ): # type: (...) -> Dict[str,str] """Update/Merge or Insert entity into table. :param entity: The properties for the table entity. :type entity: Union[TableEntity, dict[str,str]] :param mode: Merge or Replace and Insert on fail :type mode: ~azure.data.tables.UpdateMode :return: Dictionary mapping operation metadata returned from the service :rtype: dict[str,str] :raises: ~azure.core.exceptions.HttpResponseError .. admonition:: Example: .. literalinclude:: ../samples/sample_update_upsert_merge_entities.py :start-after: [START upsert_entity] :end-before: [END upsert_entity] :language: python :dedent: 8 :caption: Update/merge or insert an entity into a table """ partition_key = entity['PartitionKey'] row_key = entity['RowKey'] entity = _add_entity_properties(entity) try: metadata = None if mode is UpdateMode.MERGE: metadata, _ = self._client.table.merge_entity( table=self.table_name, partition_key=partition_key, row_key=row_key, table_entity_properties=entity, cls=kwargs.pop('cls', _return_headers_and_deserialized), **kwargs ) elif mode is UpdateMode.REPLACE: metadata, _ = self._client.table.update_entity( table=self.table_name, partition_key=partition_key, row_key=row_key, table_entity_properties=entity, cls=kwargs.pop('cls', _return_headers_and_deserialized), **kwargs) else: raise ValueError("""Update mode {} is not supported. For a list of supported modes see the UpdateMode enum""".format(mode)) return _trim_service_metadata(metadata) except ResourceNotFoundError: return self.create_entity( partition_key=partition_key, row_key=row_key, table_entity_properties=entity, **kwargs )