# The MIT License (MIT)
# Copyright (c) 2021 Microsoft Corporation
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Create, read, update and delete items in the Azure Cosmos DB SQL API service.
"""
import warnings
from datetime import datetime
from typing import Any, Dict, Mapping, Optional, Sequence, Type, Union, List, Tuple, cast, overload, AsyncIterable
from typing_extensions import Literal
from azure.core import MatchConditions
from azure.core.async_paging import AsyncItemPaged, AsyncList
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async # type: ignore
from ._cosmos_client_connection_async import CosmosClientConnection
from ._scripts import ScriptsProxy
from .._base import (
build_options as _build_options,
validate_cache_staleness_value,
_deserialize_throughput,
_replace_throughput,
GenerateGuidId,
_set_properties_cache
)
from .._change_feed.feed_range_internal import FeedRangeInternalEpk
from .._cosmos_responses import CosmosDict, CosmosList
from .._routing.routing_range import Range
from .._session_token_helpers import get_latest_session_token
from ..offer import ThroughputProperties
from ..partition_key import (
NonePartitionKeyValue,
_return_undefined_or_empty_partition_key,
_Empty,
_Undefined, PartitionKey
)
__all__ = ("ContainerProxy",)
# pylint: disable=protected-access, too-many-lines
# pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs
# pylint: disable=too-many-public-methods
PartitionKeyType = Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]] # pylint: disable=line-too-long
[docs]
class ContainerProxy:
"""An interface to interact with a specific DB Container.
This class should not be instantiated directly. Instead, use the
:func:`~azure.cosmos.aio.database.DatabaseProxy.get_container_client` method to get an existing
container, or the :func:`~azure.cosmos.aio.database.DatabaseProxy.create_container` method to create a
new container.
A container in an Azure Cosmos DB SQL API database is a collection of
documents, each of which is represented as an Item.
:ivar str id: ID (name) of the container
:ivar str session_token: The session token for the container.
"""
def __init__(
self,
client_connection: CosmosClientConnection,
database_link: str,
id: str,
properties: Optional[Dict[str, Any]] = None
) -> None:
self.client_connection = client_connection
self.id = id
self.database_link = database_link
self.container_link = "{}/colls/{}".format(database_link, self.id)
self._is_system_key: Optional[bool] = None
self._scripts: Optional[ScriptsProxy] = None
if properties:
self.client_connection._set_container_properties_cache(self.container_link,
_set_properties_cache(properties))
def __repr__(self) -> str:
return "<ContainerProxy [{}]>".format(self.container_link)[:1024]
async def _get_properties(self) -> Dict[str, Any]:
if self.container_link not in self.client_connection._container_properties_cache:
await self.read()
return self.client_connection._container_properties_cache[self.container_link]
@property
async def is_system_key(self) -> bool:
if self._is_system_key is None:
properties = await self._get_properties()
self._is_system_key = (
properties["partitionKey"]["systemKey"] if "systemKey" in properties["partitionKey"] else False
)
return self._is_system_key
def __get_client_container_caches(self) -> Dict[str, Dict[str, Any]]:
return self.client_connection._container_properties_cache
@property
def scripts(self) -> ScriptsProxy:
if self._scripts is None:
self._scripts = ScriptsProxy(self, self.client_connection, self.container_link)
return self._scripts
def _get_document_link(self, item_or_link: Union[str, Mapping[str, Any]]) -> str:
if isinstance(item_or_link, str):
return "{}/docs/{}".format(self.container_link, item_or_link)
return item_or_link["_self"]
def _get_conflict_link(self, conflict_or_link: Union[str, Mapping[str, Any]]) -> str:
if isinstance(conflict_or_link, str):
return "{}/conflicts/{}".format(self.container_link, conflict_or_link)
return conflict_or_link["_self"]
async def _set_partition_key(
self,
partition_key: PartitionKeyType
) -> Union[str, int, float, bool, List[Union[str, int, float, bool]], _Empty, _Undefined]:
if partition_key == NonePartitionKeyValue:
return _return_undefined_or_empty_partition_key(await self.is_system_key)
return cast(Union[str, int, float, bool, List[Union[str, int, float, bool]]], partition_key)
async def _get_epk_range_for_partition_key(self, partition_key_value: PartitionKeyType) -> Range:
container_properties = await self._get_properties()
partition_key_definition = container_properties["partitionKey"]
partition_key = PartitionKey(path=partition_key_definition["paths"], kind=partition_key_definition["kind"])
return partition_key._get_epk_range_for_partition_key(partition_key_value)
[docs]
@distributed_trace_async
async def read(
self,
*,
populate_partition_key_range_statistics: Optional[bool] = None,
populate_quota_info: Optional[bool] = None,
session_token: Optional[str] = None,
priority: Optional[Literal["High", "Low"]] = None,
initial_headers: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> Dict[str, Any]:
"""Read the container properties.
:keyword bool populate_partition_key_range_statistics: Enable returning partition key
range statistics in response headers.
:keyword bool populate_quota_info: Enable returning collection storage quota information in response headers.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be retrieved.
This includes if the container does not exist.
:returns: Dict representing the retrieved container.
:rtype: Dict[str, Any]
"""
if session_token is not None:
kwargs['session_token'] = session_token
if priority is not None:
kwargs['priority'] = priority
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
request_options = _build_options(kwargs)
if populate_partition_key_range_statistics is not None:
request_options["populatePartitionKeyRangeStatistics"] = populate_partition_key_range_statistics
if populate_quota_info is not None:
request_options["populateQuotaInfo"] = populate_quota_info
container = await self.client_connection.ReadContainer(self.container_link, options=request_options, **kwargs)
# Only cache Container Properties that will not change in the lifetime of the container
self.client_connection._set_container_properties_cache(self.container_link, _set_properties_cache(container)) # pylint: disable=protected-access, line-too-long
return container
[docs]
@distributed_trace_async
async def create_item(
self,
body: Dict[str, Any],
*,
pre_trigger_include: Optional[str] = None,
post_trigger_include: Optional[str] = None,
indexing_directive: Optional[int] = None,
enable_automatic_id_generation: bool = False,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
etag: Optional[str] = None,
match_condition: Optional[MatchConditions] = None,
priority: Optional[Literal["High", "Low"]] = None,
no_response: Optional[bool] = None,
**kwargs: Any
) -> CosmosDict:
"""Create an item in the container.
To update or replace an existing item, use the
:func:`ContainerProxy.upsert_item` method.
:param dict[str, str] body: A dict-like object representing the item to create.
:keyword str pre_trigger_include: trigger id to be used as pre operation trigger.
:keyword str post_trigger_include: trigger id to be used as post operation trigger.
:keyword indexing_directive: Enumerates the possible values to indicate whether the document should
be omitted from indexing. Possible values include: 0 for Default, 1 for Exclude, or 2 for Include.
:paramtype indexing_directive: Union[int, ~azure.cosmos.documents.IndexingDirective]
:keyword bool enable_automatic_id_generation: Enable automatic id generation if no id present.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource
has changed, and act according to the condition specified by the `match_condition` parameter.
:keyword match_condition: The match condition to use upon the etag.
:paramtype match_condition: ~azure.core.MatchConditions
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword bool no_response: Indicates whether service should be instructed to skip
sending response payloads. When not specified explicitly here, the default value will be determined from
client-level options.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Item with the given ID already exists.
:returns: A CosmosDict representing the new item. The dict will be empty if `no_response` is specified.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
if pre_trigger_include is not None:
kwargs['pre_trigger_include'] = pre_trigger_include
if post_trigger_include is not None:
kwargs['post_trigger_include'] = post_trigger_include
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if priority is not None:
kwargs['priority'] = priority
if etag is not None:
kwargs['etag'] = etag
if match_condition is not None:
kwargs['match_condition'] = match_condition
if no_response is not None:
kwargs['no_response'] = no_response
request_options = _build_options(kwargs)
request_options["disableAutomaticIdGeneration"] = not enable_automatic_id_generation
if indexing_directive is not None:
request_options["indexingDirective"] = indexing_directive
if self.container_link in self.__get_client_container_caches():
request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
result = await self.client_connection.CreateItem(
database_or_container_link=self.container_link, document=body, options=request_options, **kwargs
)
return result
[docs]
@distributed_trace_async
async def read_item(
self,
item: Union[str, Mapping[str, Any]],
partition_key: PartitionKeyType,
*,
post_trigger_include: Optional[str] = None,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
max_integrated_cache_staleness_in_ms: Optional[int] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> CosmosDict:
"""Get the item identified by `item`.
:param item: The ID (name) or dict representing item to retrieve.
:type item: Union[str, Dict[str, Any]]
:param partition_key: Partition key for the item to retrieve.
:type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]]
:keyword str post_trigger_include: trigger id to be used as post operation trigger.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in
milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency,
responses are guaranteed to be no staler than this value.
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item couldn't be retrieved.
:returns: A CosmosDict representing the retrieved item.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
.. admonition:: Example:
.. literalinclude:: ../samples/examples_async.py
:start-after: [START update_item]
:end-before: [END update_item]
:language: python
:dedent: 0
:caption: Get an item from the database and update one of its properties:
:name: update_item
"""
doc_link = self._get_document_link(item)
if post_trigger_include is not None:
kwargs['post_trigger_include'] = post_trigger_include
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if priority is not None:
kwargs['priority'] = priority
request_options = _build_options(kwargs)
request_options["partitionKey"] = await self._set_partition_key(partition_key)
if max_integrated_cache_staleness_in_ms is not None:
validate_cache_staleness_value(max_integrated_cache_staleness_in_ms)
request_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms
if self.container_link in self.__get_client_container_caches():
request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
return await self.client_connection.ReadItem(document_link=doc_link, options=request_options, **kwargs)
[docs]
@distributed_trace
def read_all_items(
self,
*,
max_item_count: Optional[int] = None,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
max_integrated_cache_staleness_in_ms: Optional[int] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""List all the items in the container.
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], AsyncItemPaged[Dict[str, Any]]], None]
:keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in
milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency,
responses are guaranteed to be no staler than this value.
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
response_hook = kwargs.pop('response_hook', None)
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if priority is not None:
kwargs['priority'] = priority
feed_options = _build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
if max_integrated_cache_staleness_in_ms:
validate_cache_staleness_value(max_integrated_cache_staleness_in_ms)
feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms
if hasattr(response_hook, "clear"):
response_hook.clear()
if self.container_link in self.__get_client_container_caches():
feed_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
items = self.client_connection.ReadItems(
collection_link=self.container_link, feed_options=feed_options, response_hook=response_hook, **kwargs
)
if response_hook:
response_hook(self.client_connection.last_response_headers, items)
return items
[docs]
@distributed_trace
def query_items(
self,
query: str,
*,
parameters: Optional[List[Dict[str, object]]] = None,
partition_key: Optional[PartitionKeyType] = None,
max_item_count: Optional[int] = None,
enable_scan_in_query: Optional[bool] = None,
populate_query_metrics: Optional[bool] = None,
populate_index_metrics: Optional[bool] = None,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
max_integrated_cache_staleness_in_ms: Optional[int] = None,
priority: Optional[Literal["High", "Low"]] = None,
continuation_token_limit: Optional[int] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Return all results matching the given `query`.
You can use any value for the container name in the FROM clause, but
often the container name is used. In the examples below, the container
name is "products," and is aliased as "p" for easier referencing in
the WHERE clause.
:param str query: The Azure Cosmos DB SQL query to execute.
:keyword parameters: Optional array of parameters to the query.
Each parameter is a dict() with 'name' and 'value' keys.
Ignored if no query is provided.
:paramtype parameters: List[Dict[str, Any]]
:keyword partition_key: Specifies the partition key value for the item. If none is provided,
a cross-partition query will be executed.
:paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]]
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword bool enable_scan_in_query: Allow scan on the queries which couldn't be served as
indexing was opted out on the requested paths.
:keyword bool populate_query_metrics: Enable returning query metrics in response headers.
:keyword bool populate_index_metrics: Used to obtain the index metrics to understand how the query engine used
existing indexes and how it could use potential new indexes. Please note that this options will incur
overhead, so it should be enabled only when debugging slow queries.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], AsyncItemPaged[Dict[str, Any]]], None]
:keyword int continuation_token_limit: The size limit in kb of the response continuation token in the query
response. Valid values are positive integers.
A value of 0 is the same as not passing a value (default no limit).
:keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in
milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency,
responses are guaranteed to be no staler than this value.
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
.. admonition:: Example:
.. literalinclude:: ../samples/examples_async.py
:start-after: [START query_items]
:end-before: [END query_items]
:language: python
:dedent: 0
:caption: Get all products that have not been discontinued:
:name: query_items
.. literalinclude:: ../samples/examples_async.py
:start-after: [START query_items_param]
:end-before: [END query_items_param]
:language: python
:dedent: 0
:caption: Parameterized query to get all products that have been discontinued:
:name: query_items_param
"""
response_hook = kwargs.pop('response_hook', None)
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if priority is not None:
kwargs['priority'] = priority
feed_options = _build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
if populate_query_metrics is not None:
feed_options["populateQueryMetrics"] = populate_query_metrics
if populate_index_metrics is not None:
feed_options["populateIndexMetrics"] = populate_index_metrics
if enable_scan_in_query is not None:
feed_options["enableScanInQuery"] = enable_scan_in_query
if partition_key is not None:
feed_options["partitionKey"] = self._set_partition_key(partition_key)
kwargs["containerProperties"] = self._get_properties
else:
feed_options["enableCrossPartitionQuery"] = True
if max_integrated_cache_staleness_in_ms:
validate_cache_staleness_value(max_integrated_cache_staleness_in_ms)
feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms
correlated_activity_id = GenerateGuidId()
feed_options["correlatedActivityId"] = correlated_activity_id
if continuation_token_limit is not None:
feed_options["responseContinuationTokenLimitInKb"] = continuation_token_limit
if hasattr(response_hook, "clear"):
response_hook.clear()
if self.container_link in self.__get_client_container_caches():
feed_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
items = self.client_connection.QueryItems(
database_or_container_link=self.container_link,
query=query if parameters is None else {"query": query, "parameters": parameters},
options=feed_options,
partition_key=partition_key,
response_hook=response_hook,
**kwargs
)
if response_hook:
response_hook(self.client_connection.last_response_headers, items)
return items
@overload
def query_items_change_feed(
self,
*,
max_item_count: Optional[int] = None,
start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
partition_key: PartitionKeyType,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Get a sorted list of items that were changed, in the order in which they were modified.
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Beginning: Processing the change feed items from the beginning of the change feed.
Now: Processing change feed from the current time, so only events for all future changes will be retrieved.
~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC.
By default, it is start from current ("Now")
:type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]]
:keyword partition_key: The partition key that is used to define the scope
(logical partition or a subset of a container)
:type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword Callable response_hook: A callable invoked with the response metadata.
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
...
@overload
def query_items_change_feed(
self,
*,
feed_range: Dict[str, Any],
max_item_count: Optional[int] = None,
start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Get a sorted list of items that were changed, in the order in which they were modified.
:keyword Dict[str, Any] feed_range: The feed range that is used to define the scope.
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Beginning: Processing the change feed items from the beginning of the change feed.
Now: Processing change feed from the current time, so only events for all future changes will be retrieved.
~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC.
By default, it is start from current ("Now")
:type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword Callable response_hook: A callable invoked with the response metadata.
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
...
@overload
def query_items_change_feed(
self,
*,
continuation: str,
max_item_count: Optional[int] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Get a sorted list of items that were changed, in the order in which they were modified.
:keyword str continuation: The continuation token retrieved from previous response.
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword Callable response_hook: A callable invoked with the response metadata.
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
# pylint: enable=line-too-long
...
@overload
def query_items_change_feed(
self,
*,
max_item_count: Optional[int] = None,
start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Get a sorted list of items that were changed in the entire container,
in the order in which they were modified.
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Beginning: Processing the change feed items from the beginning of the change feed.
Now: Processing change feed from the current time, so only events for all future changes will be retrieved.
~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC.
By default, it is start from current ("Now")
:type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword Callable response_hook: A callable invoked with the response metadata.
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
...
[docs]
@distributed_trace
def query_items_change_feed( # pylint: disable=unused-argument
self,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Get a sorted list of items that were changed, in the order in which they were modified.
:keyword str continuation: The continuation token retrieved from previous response.
:keyword Dict[str, Any] feed_range: The feed range that is used to define the scope.
:keyword partition_key: The partition key that is used to define the scope
(logical partition or a subset of a container)
:type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]]
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Beginning: Processing the change feed items from the beginning of the change feed.
Now: Processing change feed from the current time, so only events for all future changes will be retrieved.
~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC.
By default, it is start from current ("Now")
:type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword Callable response_hook: A callable invoked with the response metadata.
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
# pylint: disable=too-many-statements
if kwargs.get("priority") is not None:
kwargs['priority'] = kwargs['priority']
feed_options = _build_options(kwargs)
change_feed_state_context = {}
# Back compatibility with deprecation warnings for partition_key_range_id
if kwargs.get("partition_key_range_id") is not None:
warnings.warn(
"partition_key_range_id is deprecated. Please pass in feed_range instead.",
DeprecationWarning
)
change_feed_state_context["partitionKeyRangeId"] = kwargs.pop('partition_key_range_id')
# Back compatibility with deprecation warnings for is_start_from_beginning
if kwargs.get("is_start_from_beginning") is not None:
warnings.warn(
"is_start_from_beginning is deprecated. Please pass in start_time instead.",
DeprecationWarning
)
if kwargs.get("start_time") is not None:
raise ValueError("is_start_from_beginning and start_time are exclusive, please only set one of them")
is_start_from_beginning = kwargs.pop('is_start_from_beginning')
if is_start_from_beginning is True:
change_feed_state_context["startTime"] = "Beginning"
# parse start_time
if kwargs.get("start_time") is not None:
start_time = kwargs.pop('start_time')
if not isinstance(start_time, (datetime, str)):
raise TypeError(
"'start_time' must be either a datetime object, or either the values 'Now' or 'Beginning'.")
change_feed_state_context["startTime"] = start_time
# parse continuation token
if feed_options.get("continuation") is not None:
change_feed_state_context["continuation"] = feed_options.pop('continuation')
if kwargs.get("max_item_count") is not None:
feed_options["maxItemCount"] = kwargs.pop('max_item_count')
if kwargs.get("partition_key") is not None:
change_feed_state_context["partitionKey"] =\
self._set_partition_key(cast(PartitionKeyType, kwargs.get("partition_key")))
change_feed_state_context["partitionKeyFeedRange"] = \
self._get_epk_range_for_partition_key(kwargs.pop('partition_key'))
if kwargs.get("feed_range") is not None:
change_feed_state_context["feedRange"] = kwargs.pop('feed_range')
feed_options["containerProperties"] = self._get_properties()
feed_options["changeFeedStateContext"] = change_feed_state_context
response_hook = kwargs.pop('response_hook', None)
if hasattr(response_hook, "clear"):
response_hook.clear()
if self.container_link in self.__get_client_container_caches():
feed_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
result = self.client_connection.QueryItemsChangeFeed(
self.container_link, options=feed_options, response_hook=response_hook, **kwargs
)
if response_hook:
response_hook(self.client_connection.last_response_headers, result)
return result
[docs]
@distributed_trace_async
async def upsert_item(
self,
body: Dict[str, Any],
*,
pre_trigger_include: Optional[str] = None,
post_trigger_include: Optional[str] = None,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
etag: Optional[str] = None,
match_condition: Optional[MatchConditions] = None,
priority: Optional[Literal["High", "Low"]] = None,
no_response: Optional[bool] = None,
**kwargs: Any
) -> CosmosDict:
"""Insert or update the specified item.
If the item already exists in the container, it is replaced. If the item
does not already exist, it is inserted.
:param Dict[str, Any] body: A dict-like object representing the item to update or insert.
:keyword str pre_trigger_include: trigger id to be used as pre operation trigger.
:keyword str post_trigger_include: trigger id to be used as post operation trigger.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource
has changed, and act according to the condition specified by the `match_condition` parameter.
:keyword match_condition: The match condition to use upon the etag.
:paramtype match_condition: ~azure.core.MatchConditions
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword bool no_response: Indicates whether service should be instructed to skip
sending response payloads. When not specified explicitly here, the default value will be determined from
client-level options.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item could not be upserted.
:returns: A CosmosDict representing the upserted item. The dict will be empty if
`no_response` is specified.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
if pre_trigger_include is not None:
kwargs['pre_trigger_include'] = pre_trigger_include
if post_trigger_include is not None:
kwargs['post_trigger_include'] = post_trigger_include
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if priority is not None:
kwargs['priority'] = priority
if etag is not None:
kwargs['etag'] = etag
if match_condition is not None:
kwargs['match_condition'] = match_condition
if no_response is not None:
kwargs['no_response'] = no_response
request_options = _build_options(kwargs)
request_options["disableAutomaticIdGeneration"] = True
if self.container_link in self.__get_client_container_caches():
request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
result = await self.client_connection.UpsertItem(
database_or_container_link=self.container_link,
document=body,
options=request_options,
**kwargs
)
return result
[docs]
@distributed_trace_async
async def replace_item(
self,
item: Union[str, Mapping[str, Any]],
body: Dict[str, Any],
*,
pre_trigger_include: Optional[str] = None,
post_trigger_include: Optional[str] = None,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
etag: Optional[str] = None,
match_condition: Optional[MatchConditions] = None,
priority: Optional[Literal["High", "Low"]] = None,
no_response: Optional[bool] = None,
**kwargs: Any
) -> CosmosDict:
"""Replaces the specified item if it exists in the container.
If the item does not already exist in the container, an exception is raised.
:param item: The ID (name) or dict representing item to be replaced.
:type item: Union[str, Dict[str, Any]]
:param Dict[str, Any] body: A dict representing the item to replace.
:keyword str pre_trigger_include: trigger id to be used as pre operation trigger.
:keyword str post_trigger_include: trigger id to be used as post operation trigger.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource
has changed, and act according to the condition specified by the `match_condition` parameter.
:keyword match_condition: The match condition to use upon the etag.
:paramtype match_condition: ~azure.core.MatchConditions
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword bool no_response: Indicates whether service should be instructed to skip
sending response payloads. When not specified explicitly here, the default value will be determined from
client-level options.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The replace operation failed or the item with
given id does not exist.
:returns: A CosmosDict representing the item after replace went through. The dict will be empty if `no_response`
is specified.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
item_link = self._get_document_link(item)
if pre_trigger_include is not None:
kwargs['pre_trigger_include'] = pre_trigger_include
if post_trigger_include is not None:
kwargs['post_trigger_include'] = post_trigger_include
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if priority is not None:
kwargs['priority'] = priority
if etag is not None:
kwargs['etag'] = etag
if match_condition is not None:
kwargs['match_condition'] = match_condition
if no_response is not None:
kwargs['no_response'] = no_response
request_options = _build_options(kwargs)
request_options["disableAutomaticIdGeneration"] = True
if self.container_link in self.__get_client_container_caches():
request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
result = await self.client_connection.ReplaceItem(
document_link=item_link, new_document=body, options=request_options, **kwargs
)
return result
[docs]
@distributed_trace_async
async def patch_item(
self,
item: Union[str, Dict[str, Any]],
partition_key: PartitionKeyType,
patch_operations: List[Dict[str, Any]],
*,
filter_predicate: Optional[str] = None,
pre_trigger_include: Optional[str] = None,
post_trigger_include: Optional[str] = None,
session_token: Optional[str] = None,
etag: Optional[str] = None,
match_condition: Optional[MatchConditions] = None,
priority: Optional[Literal["High", "Low"]] = None,
no_response: Optional[bool] = None,
**kwargs: Any
) -> CosmosDict:
""" Patches the specified item with the provided operations if it
exists in the container.
If the item does not already exist in the container, an exception is raised.
:param item: The ID (name) or dict representing item to be patched.
:type item: Union[str, Dict[str, Any]]
:param partition_key: The partition key of the object to patch.
:type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]]
:param patch_operations: The list of patch operations to apply to the item.
:type patch_operations: List[Dict[str, Any]]
:keyword str filter_predicate: conditional filter to apply to Patch operations.
:keyword str pre_trigger_include: trigger id to be used as pre operation trigger.
:keyword str post_trigger_include: trigger id to be used as post operation trigger.
:keyword str session_token: Token for use with Session consistency.
:keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource
has changed, and act according to the condition specified by the `match_condition` parameter.
:keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag.
:keyword Callable response_hook: A callable invoked with the response metadata.
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword bool no_response: Indicates whether service should be instructed to skip
sending response payloads. When not specified explicitly here, the default value will be determined from
client-level options.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The patch operations failed or the item with
given id does not exist.
:returns: A CosmosDict representing the item after the patch operations went through. The dict will be empty if
`no_response` is specified.
:rtype: ~azure.cosmos.CosmosDict[str, Any]
"""
if pre_trigger_include is not None:
kwargs['pre_trigger_include'] = pre_trigger_include
if post_trigger_include is not None:
kwargs['post_trigger_include'] = post_trigger_include
if session_token is not None:
kwargs['session_token'] = session_token
if priority is not None:
kwargs['priority'] = priority
if etag is not None:
kwargs['etag'] = etag
if match_condition is not None:
kwargs['match_condition'] = match_condition
if no_response is not None:
kwargs['no_response'] = no_response
request_options = _build_options(kwargs)
request_options["disableAutomaticIdGeneration"] = True
request_options["partitionKey"] = await self._set_partition_key(partition_key)
if filter_predicate is not None:
request_options["filterPredicate"] = filter_predicate
if self.container_link in self.__get_client_container_caches():
request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
item_link = self._get_document_link(item)
result = await self.client_connection.PatchItem(
document_link=item_link, operations=patch_operations, options=request_options, **kwargs)
return result
[docs]
@distributed_trace_async
async def delete_item(
self,
item: Union[str, Mapping[str, Any]],
partition_key: PartitionKeyType,
*,
pre_trigger_include: Optional[str] = None,
post_trigger_include: Optional[str] = None,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
etag: Optional[str] = None,
match_condition: Optional[MatchConditions] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> None:
"""Delete the specified item from the container.
If the item does not already exist in the container, an exception is raised.
:param item: The ID (name) or dict representing item to be deleted.
:type item: Union[str, Dict[str, Any]]
:param partition_key: Specifies the partition key value for the item.
:type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]]
:keyword str pre_trigger_include: trigger id to be used as pre operation trigger.
:keyword str post_trigger_include: trigger id to be used as post operation trigger.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource
has changed, and act according to the condition specified by the `match_condition` parameter.
:keyword match_condition: The match condition to use upon the etag.
:paramtype match_condition: ~azure.core.MatchConditions
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], None], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The item wasn't deleted successfully.
:raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The item does not exist in the container.
:rtype: None
"""
if pre_trigger_include is not None:
kwargs['pre_trigger_include'] = pre_trigger_include
if post_trigger_include is not None:
kwargs['post_trigger_include'] = post_trigger_include
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if etag is not None:
kwargs['etag'] = etag
if match_condition is not None:
kwargs['match_condition'] = match_condition
if priority is not None:
kwargs['priority'] = priority
request_options = _build_options(kwargs)
request_options["partitionKey"] = await self._set_partition_key(partition_key)
if self.container_link in self.__get_client_container_caches():
request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
document_link = self._get_document_link(item)
await self.client_connection.DeleteItem(document_link=document_link, options=request_options, **kwargs)
[docs]
@distributed_trace_async
async def get_throughput(self, **kwargs: Any) -> ThroughputProperties:
"""Get the ThroughputProperties object for this container.
If no ThroughputProperties already exists for the container, an exception is raised.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], List[Dict[str, Any]]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: No throughput properties exist for the container
or the throughput properties could not be retrieved.
:returns: ThroughputProperties for the container.
:rtype: ~azure.cosmos.offer.ThroughputProperties
"""
response_hook = kwargs.pop('response_hook', None)
throughput_properties: List[Dict[str, Any]]
properties = await self._get_properties()
link = properties["_self"]
query_spec = {
"query": "SELECT * FROM root r WHERE r.resource=@link",
"parameters": [{"name": "@link", "value": link}],
}
options = {"containerRID": properties["_rid"]}
throughput_properties = [throughput async for throughput in
self.client_connection.QueryOffers(query_spec, options, **kwargs)]
if response_hook:
response_hook(self.client_connection.last_response_headers, throughput_properties)
return _deserialize_throughput(throughput=throughput_properties)
[docs]
@distributed_trace_async
async def replace_throughput(
self,
throughput: Union[int, ThroughputProperties],
**kwargs: Any
) -> ThroughputProperties:
"""Replace the container's throughput.
If no ThroughputProperties already exist for the container, an exception is raised.
:param throughput: The throughput to be set.
:type throughput: Union[int, ~azure.cosmos.ThroughputProperties]
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: No throughput properties exist for the container
or the throughput properties could not be updated.
:returns: ThroughputProperties for the container, updated with new throughput.
:rtype: ~azure.cosmos.offer.ThroughputProperties
"""
throughput_properties: List[Dict[str, Any]]
properties = await self._get_properties()
link = properties["_self"]
query_spec = {
"query": "SELECT * FROM root r WHERE r.resource=@link",
"parameters": [{"name": "@link", "value": link}],
}
options = {"containerRID": properties["_rid"]}
throughput_properties = [throughput async for throughput in
self.client_connection.QueryOffers(query_spec, options, **kwargs)]
new_offer = throughput_properties[0].copy()
_replace_throughput(throughput=throughput, new_throughput_properties=new_offer)
data = await self.client_connection.ReplaceOffer(offer_link=throughput_properties[0]["_self"],
offer=throughput_properties[0], **kwargs)
return ThroughputProperties(offer_throughput=data["content"]["offerThroughput"], properties=data)
[docs]
@distributed_trace
def list_conflicts(
self,
*,
max_item_count: Optional[int] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""List all the conflicts in the container.
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], AsyncItemPaged[Dict[str, Any]]], None]
:returns: An AsyncItemPaged of conflicts (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
feed_options = _build_options(kwargs)
response_hook = kwargs.pop('response_hook', None)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
if self.container_link in self.__get_client_container_caches():
feed_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
result = self.client_connection.ReadConflicts(
collection_link=self.container_link, feed_options=feed_options, **kwargs
)
if response_hook:
response_hook(self.client_connection.last_response_headers, result)
return result
[docs]
@distributed_trace
def query_conflicts(
self,
query: str,
*,
parameters: Optional[List[Dict[str, object]]] = None,
partition_key: Optional[PartitionKeyType] = None,
max_item_count: Optional[int] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Return all conflicts matching a given `query`.
:param str query: The Azure Cosmos DB SQL query to execute.
:keyword parameters: Optional array of parameters to the query. Ignored if no query is provided.
:paramtype parameters: List[Dict[str, Any]]
:keyword partition_key: Specifies the partition key value for the item. If none is passed in, a
cross partition query will be executed.
:paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]]
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], AsyncItemPaged[Dict[str, Any]]], None]
:returns: An AsyncItemPaged of conflicts (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
feed_options = _build_options(kwargs)
response_hook = kwargs.pop('response_hook', None)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
if partition_key is not None:
feed_options["partitionKey"] = self._set_partition_key(partition_key)
else:
feed_options["enableCrossPartitionQuery"] = True
if self.container_link in self.__get_client_container_caches():
feed_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
result = self.client_connection.QueryConflicts(
collection_link=self.container_link,
query=query if parameters is None else {"query": query, "parameters": parameters},
options=feed_options,
**kwargs
)
if response_hook:
response_hook(self.client_connection.last_response_headers, result)
return result
[docs]
@distributed_trace_async
async def get_conflict(
self,
conflict: Union[str, Mapping[str, Any]],
partition_key: PartitionKeyType,
**kwargs: Any,
) -> Dict[str, Any]:
"""Get the conflict identified by `conflict`.
:param conflict: The ID (name) or dict representing the conflict to retrieve.
:type conflict: Union[str, Dict[str, Any]]
:param partition_key: Partition key for the conflict to retrieve.
:type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]]
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given conflict couldn't be retrieved.
:returns: A dict representing the retrieved conflict.
:rtype: Dict[str, Any]
"""
request_options = _build_options(kwargs)
request_options["partitionKey"] = await self._set_partition_key(partition_key)
if self.container_link in self.__get_client_container_caches():
request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
result = await self.client_connection.ReadConflict(
conflict_link=self._get_conflict_link(conflict), options=request_options, **kwargs
)
return result
[docs]
@distributed_trace_async
async def delete_conflict(
self,
conflict: Union[str, Mapping[str, Any]],
partition_key: PartitionKeyType,
**kwargs: Any,
) -> None:
"""Delete a specified conflict from the container.
If the conflict does not already exist in the container, an exception is raised.
:param conflict: The ID (name) or dict representing the conflict to retrieve.
:type conflict: Union[str, Dict[str, Any]]
:param partition_key: Partition key for the conflict to retrieve.
:type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]]
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], None], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The conflict wasn't deleted successfully.
:raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The conflict does not exist in the container.
:rtype: None
"""
request_options = _build_options(kwargs)
request_options["partitionKey"] = await self._set_partition_key(partition_key)
await self.client_connection.DeleteConflict(
conflict_link=self._get_conflict_link(conflict), options=request_options, **kwargs
)
[docs]
@distributed_trace_async
async def delete_all_items_by_partition_key(
self,
partition_key: PartitionKeyType,
*,
pre_trigger_include: Optional[str] = None,
post_trigger_include: Optional[str] = None,
session_token: Optional[str] = None,
etag: Optional[str] = None,
match_condition: Optional[MatchConditions] = None,
**kwargs: Any
) -> None:
"""The delete by partition key feature is an asynchronous, background operation that allows you to delete all
documents with the same logical partition key value, using the Cosmos SDK. The delete by partition key
operation is constrained to consume at most 10% of the total
available RU/s on the container each second. This helps in limiting the resources used by
this background task.
:param partition_key: Partition key for the items to be deleted.
:type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]]
:keyword str pre_trigger_include: trigger id to be used as pre operation trigger.
:keyword str post_trigger_include: trigger id to be used as post operation trigger.
:keyword str session_token: Token for use with Session consistency.
:keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource
has changed, and act according to the condition specified by the `match_condition` parameter.
:keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag.
:keyword Callable response_hook: A callable invoked with the response metadata.
:rtype: None
"""
if pre_trigger_include is not None:
kwargs['pre_trigger_include'] = pre_trigger_include
if post_trigger_include is not None:
kwargs['post_trigger_include'] = post_trigger_include
if session_token is not None:
kwargs['session_token'] = session_token
if etag is not None:
kwargs['etag'] = etag
if match_condition is not None:
kwargs['match_condition'] = match_condition
request_options = _build_options(kwargs)
# regardless if partition key is valid we set it as invalid partition keys are set to a default empty value
request_options["partitionKey"] = await self._set_partition_key(partition_key)
if self.container_link in self.__get_client_container_caches():
request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
await self.client_connection.DeleteAllItemsByPartitionKey(collection_link=self.container_link,
options=request_options, **kwargs)
[docs]
@distributed_trace_async
async def execute_item_batch(
self,
batch_operations: Sequence[Union[Tuple[str, Tuple[Any, ...]], Tuple[str, Tuple[Any, ...], Dict[str, Any]]]],
partition_key: PartitionKeyType,
*,
pre_trigger_include: Optional[str] = None,
post_trigger_include: Optional[str] = None,
session_token: Optional[str] = None,
etag: Optional[str] = None,
match_condition: Optional[MatchConditions] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> CosmosList:
""" Executes the transactional batch for the specified partition key.
:param batch_operations: The batch of operations to be executed.
:type batch_operations: List[Tuple[Any]]
:param partition_key: The partition key value of the batch operations.
:type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]]
:keyword str pre_trigger_include: trigger id to be used as pre operation trigger.
:keyword str post_trigger_include: trigger id to be used as post operation trigger.
:keyword str session_token: Token for use with Session consistency.
:keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource
has changed, and act according to the condition specified by the `match_condition` parameter.
:keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag.
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword Callable response_hook: A callable invoked with the response metadata.
:returns: A CosmosList representing the items after the batch operations went through.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The batch failed to execute.
:raises ~azure.cosmos.exceptions.CosmosBatchOperationError: A transactional batch operation failed in the batch.
:rtype: ~azure.cosmos.CosmosList[Dict[str, Any]]
"""
if pre_trigger_include is not None:
kwargs['pre_trigger_include'] = pre_trigger_include
if post_trigger_include is not None:
kwargs['post_trigger_include'] = post_trigger_include
if session_token is not None:
kwargs['session_token'] = session_token
if etag is not None:
kwargs['etag'] = etag
if match_condition is not None:
kwargs['match_condition'] = match_condition
if priority is not None:
kwargs['priority'] = priority
request_options = _build_options(kwargs)
request_options["partitionKey"] = await self._set_partition_key(partition_key)
request_options["disableAutomaticIdGeneration"] = True
if self.container_link in self.__get_client_container_caches():
request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"]
return await self.client_connection.Batch(
collection_link=self.container_link, batch_operations=batch_operations, options=request_options, **kwargs)
[docs]
@distributed_trace
def read_feed_ranges(
self,
*,
force_refresh: bool = False,
**kwargs: Any
) -> AsyncIterable[Dict[str, Any]]:
""" Obtains a list of feed ranges that can be used to parallelize feed operations.
:keyword bool force_refresh:
Flag to indicate whether obtain the list of feed ranges directly from cache or refresh the cache.
:returns: AsyncIterable representing the feed ranges in base64 encoded string
:rtype: AsyncIterable[Dict[str, Any]]
.. warning::
The structure of the dict representation of a feed range may vary, including which keys
are present. It therefore should only be treated as an opaque value.
"""
if force_refresh is True:
self.client_connection.refresh_routing_map_provider()
async def get_next(continuation_token:str) -> List[Dict[str, Any]]: # pylint: disable=unused-argument
partition_key_ranges = \
await self.client_connection._routing_map_provider.get_overlapping_ranges( # pylint: disable=protected-access
self.container_link,
# default to full range
[Range("", "FF", True, False)],
**kwargs)
feed_ranges = [FeedRangeInternalEpk(Range.PartitionKeyRangeToRange(partitionKeyRange)).to_dict()
for partitionKeyRange in partition_key_ranges]
return feed_ranges
async def extract_data(feed_ranges_response: List[Dict[str, Any]]):
return None, AsyncList(feed_ranges_response)
return AsyncItemPaged(
get_next,
extract_data
)
[docs]
async def get_latest_session_token(
self,
feed_ranges_to_session_tokens: List[Tuple[Dict[str, Any], str]],
target_feed_range: Dict[str, Any]
) -> str:
""" **provisional** This method is still in preview and may be subject to breaking changes.
Gets the the most up to date session token from the list of session token and feed
range tuples for a specific target feed range. The feed range can be obtained from a partition key
or by reading the container feed ranges. This should only be used if maintaining own session token or else
the CosmosClient instance will keep track of session token. Session tokens and feed ranges are
scoped to a container. Only input session tokens and feed ranges obtained from the same container.
:param feed_ranges_to_session_tokens: List of feed range and session token tuples.
:type feed_ranges_to_session_tokens: List[Tuple[Dict[str, Any], str]]
:param target_feed_range: feed range to get most up to date session token.
:type target_feed_range: Dict[str, Any]
:returns: a session token
:rtype: str
"""
return get_latest_session_token(feed_ranges_to_session_tokens, target_feed_range)
[docs]
async def feed_range_from_partition_key(self, partition_key: PartitionKeyType) -> Dict[str, Any]:
""" Gets the feed range for a given partition key.
:param partition_key: partition key to get feed range.
:type partition_key: PartitionKeyType
:returns: a feed range
:rtype: Dict[str, Any]
.. warning::
The structure of the dict representation of a feed range may vary, including which keys
are present. It therefore should only be treated as an opaque value.
"""
return FeedRangeInternalEpk(await self._get_epk_range_for_partition_key(partition_key)).to_dict()
[docs]
async def is_feed_range_subset(self, parent_feed_range: Dict[str, Any],
child_feed_range: Dict[str, Any]) -> bool:
"""Checks if child feed range is a subset of parent feed range.
:param parent_feed_range: left feed range
:type parent_feed_range: Dict[str, Any]
:param child_feed_range: right feed range
:type child_feed_range: Dict[str, Any]
:returns: a boolean indicating if child feed range is a subset of parent feed range
:rtype: bool
.. warning::
The structure of the dict representation of a feed range may vary, including which keys
are present. It therefore should only be treated as an opaque value.
"""
parent_feed_range_epk = FeedRangeInternalEpk.from_json(parent_feed_range)
child_feed_range_epk = FeedRangeInternalEpk.from_json(child_feed_range)
return child_feed_range_epk.get_normalized_range().is_subset(
parent_feed_range_epk.get_normalized_range())