# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from typing import (
Union,
Any,
Dict,
)
from ._models import UpdateMode
from ._serialize import _get_match_headers, _add_entity_properties
[docs]class TableBatchOperations(object):
"""
This is the class that is used for batch operations for the data tables
service.
The Tables service supports batch transactions on entities that are in the
same table and belong to the same partition group. Multiple operations are
supported within a single transaction. The batch can include at most 100
entities, and its total payload may be no more than 4 MB in size.
"""
def __init__(
self,
client, # type: AzureTable
serializer, # type: msrest.Serializer
deserializer, # type: msrest.Deserializer
config, # type: AzureTableConfiguration
table_name, # type: str
table_client, # type: TableClient
**kwargs # type: Dict[str, Any]
):
"""Create TableClient from a Credential.
:param client: an AzureTable object
:type client: AzureTable
:param serializer: serializer object for request serialization
:type serializer: msrest.Serializer
:param deserializer: deserializer object for request serialization
:type deserializer: msrest.Deserializer
:param config: Azure Table Configuration object
:type config: AzureTableConfiguration
:param table_name: name of the Table to perform operations on
:type table_name: str
:param table_client: TableClient object to perform operations on
:type table_client: TableClient
:returns: None
"""
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
self.table_name = table_name
self._table_client = table_client
self._partition_key = kwargs.pop("partition_key", None)
self._requests = []
self._entities = []
def _verify_partition_key(
self, entity # type: Union[Entity, dict]
):
# (...) -> None
if self._partition_key is None:
self._partition_key = entity["PartitionKey"]
elif "PartitionKey" in entity:
if entity["PartitionKey"] != self._partition_key:
raise ValueError("Partition Keys must all be the same")
[docs] def create_entity(
self,
entity, # type: Union[TableEntity, Dict[str,str]]
**kwargs # type: Any
):
# type: (...) -> None
"""Adds an insert operation to the current batch.
:param entity: The properties for the table entity.
:type entity: TableEntity or dict[str,str]
:return: None
:raises ValueError:
.. admonition:: Example:
.. literalinclude:: ../samples/sample_batching.py
:start-after: [START batching]
:end-before: [END batching]
:language: python
:dedent: 8
:caption: Creating and adding an entity to a Table
"""
self._verify_partition_key(entity)
if "PartitionKey" in entity and "RowKey" in entity:
entity = _add_entity_properties(entity)
else:
raise ValueError("PartitionKey and RowKey were not provided in entity")
self._batch_create_entity(table=self.table_name, entity=entity, **kwargs)
self._entities.append(entity)
def _batch_create_entity(
self,
table, # type: str
entity, # type: Union[TableEntity, Dict[str,str]]
timeout=None, # type: Optional[int]
request_id_parameter=None, # type: Optional[str]
response_preference="return-no-content", # type: Optional[Union[str, "models.ResponseFormat"]]
query_options=None, # type: Optional["models.QueryOptions"]
**kwargs # type: Any
):
# (...) -> None
"""
Adds an insert operation to the batch. See
:func:`azure.data.tables.TableClient.insert_entity` for more information
on insert operations.
The operation will not be executed until the batch is committed
:param: table:
The table to perform the operation on
:type: table: str
:param: entity:
The entity to insert. Can be a dict or an entity object
Must contain a PartitionKey and a RowKey.
:type: entity: dict or :class:`~azure.data.tables.models.Entity`
"""
_format = None
if query_options is not None:
_format = query_options.format
data_service_version = "3.0"
content_type = kwargs.pop("content_type", "application/json;odata=nometadata")
accept = "application/json;odata=minimalmetadata"
# Construct URL
url = self._batch_create_entity.metadata["url"] # type: ignore
path_format_arguments = {
"url": self._serialize.url(
"self._config.url", self._config.url, "str", skip_quote=True
),
"table": self._serialize.url("table", table, "str"),
}
url = self._client._client.format_url( # pylint:disable=protected-access
url, **path_format_arguments
)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
if timeout is not None:
query_parameters["timeout"] = self._serialize.query(
"timeout", timeout, "int", minimum=0
)
if _format is not None:
query_parameters["$format"] = self._serialize.query(
"format", _format, "str"
)
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters["x-ms-version"] = self._serialize.header(
"self._config.version", self._config.version, "str"
)
if request_id_parameter is not None:
header_parameters["x-ms-client-request-id"] = self._serialize.header(
"request_id_parameter", request_id_parameter, "str"
)
header_parameters["DataServiceVersion"] = self._serialize.header(
"data_service_version", data_service_version, "str"
)
if response_preference is not None:
header_parameters["Prefer"] = self._serialize.header(
"response_preference", response_preference, "str"
)
header_parameters["Content-Type"] = self._serialize.header(
"content_type", content_type, "str"
)
header_parameters["Accept"] = self._serialize.header("accept", accept, "str")
body_content_kwargs = {} # type: Dict[str, Any]
if entity is not None:
body_content = self._serialize.body(entity, "{object}")
else:
body_content = None
body_content_kwargs["content"] = body_content
request = self._client._client.post( # pylint:disable=protected-access
url, query_parameters, header_parameters, **body_content_kwargs
)
self._requests.append(request)
_batch_create_entity.metadata = {"url": "/{table}"} # type: ignore
[docs] def update_entity(
self,
entity, # type: Union[TableEntity, Dict[str,str]]
mode=UpdateMode.MERGE, # type: UpdateMode
**kwargs # type: Any
):
# (...) -> None
"""Adds an update operation to the current batch.
:param entity: The properties for the table entity.
:type entity: TableEntity or dict[str,str]
:param mode: Merge or Replace entity
:type mode: ~azure.data.tables.UpdateMode
:keyword str etag: Etag of the entity
:keyword ~azure.core.MatchConditions match_condition: MatchCondition
:return: None
:raises ValueError:
.. admonition:: Example:
.. literalinclude:: ../samples/sample_batching.py
:start-after: [START batching]
:end-before: [END batching]
:language: python
:dedent: 8
:caption: Creating and adding an entity to a Table
"""
self._verify_partition_key(entity)
if_match, _ = _get_match_headers(
kwargs=dict(
kwargs,
etag=kwargs.pop("etag", None),
match_condition=kwargs.pop("match_condition", None),
),
etag_param="etag",
match_param="match_condition",
)
partition_key = entity["PartitionKey"]
row_key = entity["RowKey"]
entity = _add_entity_properties(entity)
if mode is UpdateMode.REPLACE:
self._batch_update_entity(
table=self.table_name,
partition_key=partition_key,
row_key=row_key,
if_match=if_match or "*",
table_entity_properties=entity,
**kwargs
)
elif mode is UpdateMode.MERGE:
self._batch_merge_entity(
table=self.table_name,
partition_key=partition_key,
row_key=row_key,
if_match=if_match or "*",
table_entity_properties=entity,
**kwargs
)
self._entities.append(entity)
def _batch_update_entity(
self,
table, # type: str
partition_key, # type: str
row_key, # type: str
timeout=None, # type: Optional[int]
request_id_parameter=None, # type: Optional[str]
if_match=None, # type: Optional[str]
table_entity_properties=None, # type: Optional[Dict[str, object]]
query_options=None, # type: Optional["models.QueryOptions"]
**kwargs # type: Any
):
# type: (...) -> None
"""Update entity in a table.
:param table: The name of the table.
:type table: str
:param partition_key: The partition key of the entity.
:type partition_key: str
:param row_key: The row key of the entity.
:type row_key: str
:param timeout: The timeout parameter is expressed in seconds.
:type timeout: int
:param request_id_parameter: Provides a client-generated, opaque value with a 1 KB character
limit that is recorded in the analytics logs when analytics logging is enabled.
:type request_id_parameter: str
:param if_match: Match condition for an entity to be updated. If specified and a matching
entity is not found, an error will be raised. To force an unconditional update, set to the
wildcard character (*). If not specified, an insert will be performed when no existing entity
is found to update and a replace will be performed if an existing entity is found.
:type if_match: str
:param table_entity_properties: The properties for the table entity.
:type table_entity_properties: dict[str, object]
:param query_options: Parameter group.
:type query_options: ~azure.data.tables.models.QueryOptions
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
_format = None
if query_options is not None:
_format = query_options.format
data_service_version = "3.0"
content_type = kwargs.pop("content_type", "application/json")
accept = "application/json"
# Construct URL
url = self._batch_update_entity.metadata["url"] # type: ignore
path_format_arguments = {
"url": self._serialize.url(
"self._config.url", self._config.url, "str", skip_quote=True
),
"table": self._serialize.url("table", table, "str"),
"partitionKey": self._serialize.url("partition_key", partition_key, "str"),
"rowKey": self._serialize.url("row_key", row_key, "str"),
}
url = self._client._client.format_url( # pylint:disable=protected-access
url, **path_format_arguments
)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
if timeout is not None:
query_parameters["timeout"] = self._serialize.query(
"timeout", timeout, "int", minimum=0
)
if _format is not None:
query_parameters["$format"] = self._serialize.query(
"format", _format, "str"
)
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters["x-ms-version"] = self._serialize.header(
"self._config.version", self._config.version, "str"
)
if request_id_parameter is not None:
header_parameters["x-ms-client-request-id"] = self._serialize.header(
"request_id_parameter", request_id_parameter, "str"
)
header_parameters["DataServiceVersion"] = self._serialize.header(
"data_service_version", data_service_version, "str"
)
if if_match is not None:
header_parameters["If-Match"] = self._serialize.header(
"if_match", if_match, "str"
)
header_parameters["Content-Type"] = self._serialize.header(
"content_type", content_type, "str"
)
header_parameters["Accept"] = self._serialize.header("accept", accept, "str")
body_content_kwargs = {} # type: Dict[str, Any]
if table_entity_properties is not None:
body_content = self._serialize.body(table_entity_properties, "{object}")
else:
body_content = None
body_content_kwargs["content"] = body_content
request = self._client._client.put( # pylint:disable=protected-access
url, query_parameters, header_parameters, **body_content_kwargs
)
self._requests.append(request)
_batch_update_entity.metadata = {
"url": "/{table}(PartitionKey='{partitionKey}',RowKey='{rowKey}')"
} # type: ignore
def _batch_merge_entity(
self,
table, # type: str
partition_key, # type: str
row_key, # type: str
timeout=None, # type: Optional[int]
request_id_parameter=None, # type: Optional[str]
if_match=None, # type: Optional[str]
table_entity_properties=None, # type: Optional[Dict[str, object]]
query_options=None, # type: Optional["models.QueryOptions"]
**kwargs # type: Any
):
# type: (...) -> None
"""Merge entity in a table.
:param table: The name of the table.
:type table: str
:param partition_key: The partition key of the entity.
:type partition_key: str
:param row_key: The row key of the entity.
:type row_key: str
:param timeout: The timeout parameter is expressed in seconds.
:type timeout: int
:param request_id_parameter: Provides a client-generated, opaque value with a 1 KB character
limit that is recorded in the analytics logs when analytics logging is enabled.
:type request_id_parameter: str
:param if_match: Match condition for an entity to be updated. If specified and a matching
entity is not found, an error will be raised. To force an unconditional update, set to the
wildcard character (*). If not specified, an insert will be performed when no existing entity
is found to update and a merge will be performed if an existing entity is found.
:type if_match: str
:param table_entity_properties: The properties for the table entity.
:type table_entity_properties: dict[str, object]
:param query_options: Parameter group.
:type query_options: ~azure.data.tables.models.QueryOptions
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
_format = None
if query_options is not None:
_format = query_options.format
data_service_version = "3.0"
content_type = kwargs.pop("content_type", "application/json")
accept = "application/json"
# Construct URL
url = self._batch_merge_entity.metadata["url"] # type: ignore
path_format_arguments = {
"url": self._serialize.url(
"self._config.url", self._config.url, "str", skip_quote=True
),
"table": self._serialize.url("table", table, "str"),
"partitionKey": self._serialize.url("partition_key", partition_key, "str"),
"rowKey": self._serialize.url("row_key", row_key, "str"),
}
url = self._client._client.format_url( # pylint:disable=protected-access
url, **path_format_arguments
)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
if timeout is not None:
query_parameters["timeout"] = self._serialize.query(
"timeout", timeout, "int", minimum=0
)
if _format is not None:
query_parameters["$format"] = self._serialize.query(
"format", _format, "str"
)
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters["x-ms-version"] = self._serialize.header(
"self._config.version", self._config.version, "str"
)
if request_id_parameter is not None:
header_parameters["x-ms-client-request-id"] = self._serialize.header(
"request_id_parameter", request_id_parameter, "str"
)
header_parameters["DataServiceVersion"] = self._serialize.header(
"data_service_version", data_service_version, "str"
)
if if_match is not None:
header_parameters["If-Match"] = self._serialize.header(
"if_match", if_match, "str"
)
header_parameters["Content-Type"] = self._serialize.header(
"content_type", content_type, "str"
)
header_parameters["Accept"] = self._serialize.header("accept", accept, "str")
body_content_kwargs = {} # type: Dict[str, Any]
if table_entity_properties is not None:
body_content = self._serialize.body(table_entity_properties, "{object}")
else:
body_content = None
body_content_kwargs["content"] = body_content
request = self._client._client.patch( # pylint:disable=protected-access
url, query_parameters, header_parameters, **body_content_kwargs
)
self._requests.append(request)
_batch_merge_entity.metadata = {
"url": "/{table}(PartitionKey='{partitionKey}',RowKey='{rowKey}')"
}
[docs] def delete_entity(
self,
partition_key, # type: str
row_key, # type: str
**kwargs # type: Any
):
# type: (...) -> None
"""Adds a delete operation to the current branch.
:param partition_key: The partition key of the entity.
:type partition_key: str
:param row_key: The row key of the entity.
:type row_key: str
:keyword str etag: Etag of the entity
:keyword ~azure.core.MatchConditions match_condition: MatchCondition
:raises ValueError:
.. admonition:: Example:
.. literalinclude:: ../samples/sample_batching.py
:start-after: [START batching]
:end-before: [END batching]
:language: python
:dedent: 8
:caption: Creating and adding an entity to a Table
"""
if self._partition_key:
if partition_key != self._partition_key:
raise ValueError("Partition Keys must all be the same")
else:
self._partition_key = partition_key
if_match, _ = _get_match_headers(
kwargs=dict(
kwargs,
etag=kwargs.pop("etag", None),
match_condition=kwargs.pop("match_condition", None),
),
etag_param="etag",
match_param="match_condition",
)
self._batch_delete_entity(
table=self.table_name,
partition_key=partition_key,
row_key=row_key,
if_match=if_match or "*",
**kwargs
)
temp_entity = {"PartitionKey": partition_key, "RowKey": row_key}
self._entities.append(_add_entity_properties(temp_entity))
def _batch_delete_entity(
self,
table, # type: str
partition_key, # type: str
row_key, # type: str
if_match, # type: str
timeout=None, # type: Optional[int]
request_id_parameter=None, # type: Optional[str]
query_options=None, # type: Optional["models.QueryOptions"]
):
# type: (...) -> None
"""Deletes the specified entity in a table.
:param table: The name of the table.
:type table: str
:param partition_key: The partition key of the entity.
:type partition_key: str
:param row_key: The row key of the entity.
:type row_key: str
:param if_match: Match condition for an entity to be deleted. If specified and a matching
entity is not found, an error will be raised. To force an unconditional delete, set to the
wildcard character (*).
:type if_match: str
:param timeout: The timeout parameter is expressed in seconds.
:type timeout: int
:param request_id_parameter: Provides a client-generated, opaque value with a 1 KB character
limit that is recorded in the analytics logs when analytics logging is enabled.
:type request_id_parameter: str
:param query_options: Parameter group.
:type query_options: ~azure.data.tables.models.QueryOptions
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
_format = None
if query_options is not None:
_format = query_options.format
data_service_version = "3.0"
accept = "application/json;odata=minimalmetadata"
# Construct URL
url = self._batch_delete_entity.metadata["url"] # type: ignore
path_format_arguments = {
"url": self._serialize.url(
"self._config.url", self._config.url, "str", skip_quote=True
),
"table": self._serialize.url("table", table, "str"),
"partitionKey": self._serialize.url("partition_key", partition_key, "str"),
"rowKey": self._serialize.url("row_key", row_key, "str"),
}
url = self._client._client.format_url( # pylint:disable=protected-access
url, **path_format_arguments
)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
if timeout is not None:
query_parameters["timeout"] = self._serialize.query(
"timeout", timeout, "int", minimum=0
)
if _format is not None:
query_parameters["$format"] = self._serialize.query(
"format", _format, "str"
)
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters["x-ms-version"] = self._serialize.header(
"self._config.version", self._config.version, "str"
)
if request_id_parameter is not None:
header_parameters["x-ms-client-request-id"] = self._serialize.header(
"request_id_parameter", request_id_parameter, "str"
)
header_parameters["DataServiceVersion"] = self._serialize.header(
"data_service_version", data_service_version, "str"
)
header_parameters["If-Match"] = self._serialize.header(
"if_match", if_match, "str"
)
header_parameters["Accept"] = self._serialize.header("accept", accept, "str")
request = self._client._client.delete( # pylint:disable=protected-access
url, query_parameters, header_parameters
)
self._requests.append(request)
_batch_delete_entity.metadata = {
"url": "/{table}(PartitionKey='{partitionKey}',RowKey='{rowKey}')"
}
[docs] def upsert_entity(
self,
entity, # type: Union[TableEntity, Dict[str,str]]
mode=UpdateMode.MERGE, # type: UpdateMode
**kwargs # type: Any
):
# type: (...) -> None
"""Adds an upsert (update/merge) operation to the batch.
:param entity: The properties for the table entity.
:type entity: TableEntity or dict[str,str]
:param mode: Merge or Replace and Insert on fail
:type mode: ~azure.data.tables.UpdateMode
:raises ValueError:
.. admonition:: Example:
.. literalinclude:: ../samples/sample_batching.py
:start-after: [START batching]
:end-before: [END batching]
:language: python
:dedent: 8
:caption: Creating and adding an entity to a Table
"""
self._verify_partition_key(entity)
partition_key = entity["PartitionKey"]
row_key = entity["RowKey"]
entity = _add_entity_properties(entity)
if mode is UpdateMode.MERGE:
self._batch_merge_entity(
table=self.table_name,
partition_key=partition_key,
row_key=row_key,
table_entity_properties=entity,
**kwargs
)
elif mode is UpdateMode.REPLACE:
self._batch_update_entity(
table=self.table_name,
partition_key=partition_key,
row_key=row_key,
table_entity_properties=entity,
**kwargs
)
self._entities.append(entity)
def __enter__(self):
# type: (...) -> TableBatchOperations
return self
def __exit__(
self,
*args, # type: Any
**kwargs # type: Any
):
# (...) -> None
self._table_client._batch_send(*self._requests, **kwargs)