Source code for azure.cosmos.database

# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Interact with databases in the Azure Cosmos DB SQL API service.
"""

from typing import Any, List, Dict, Mapping, Union, cast, Iterable, Optional

import six
from azure.core.tracing.decorator import distributed_trace  # type: ignore

from ._cosmos_client_connection import CosmosClientConnection
from ._base import build_options
from .container import ContainerProxy
from .offer import Offer
from .http_constants import StatusCodes
from .exceptions import CosmosResourceNotFoundError
from .user import UserProxy

__all__ = ("DatabaseProxy",)

# pylint: disable=protected-access
# pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs


[docs]class DatabaseProxy(object): """An interface to interact with a specific database. This class should not be instantiated directly. Instead use the :func:`CosmosClient.get_database_client` method. A database contains one or more containers, each of which can contain items, stored procedures, triggers, and user-defined functions. A database can also have associated users, each of which is configured with a set of permissions for accessing certain containers, stored procedures, triggers, user-defined functions, or items. :ivar id: The ID (name) of the database. An Azure Cosmos DB SQL API database has the following system-generated properties. These properties are read-only: * `_rid`: The resource ID. * `_ts`: When the resource was last updated. The value is a timestamp. * `_self`: The unique addressable URI for the resource. * `_etag`: The resource etag required for optimistic concurrency control. * `_colls`: The addressable path of the collections resource. * `_users`: The addressable path of the users resource. """ def __init__(self, client_connection, id, properties=None): # pylint: disable=redefined-builtin # type: (CosmosClientConnection, str, Dict[str, Any]) -> None """ :param ClientSession client_connection: Client from which this database was retrieved. :param str id: ID (name) of the database. """ self.client_connection = client_connection self.id = id self.database_link = u"dbs/{}".format(self.id) self._properties = properties def __repr__(self): # type () -> str return "<DatabaseProxy [{}]>".format(self.database_link)[:1024] @staticmethod def _get_container_id(container_or_id): # type: (Union[str, ContainerProxy, Dict[str, Any]]) -> str if isinstance(container_or_id, six.string_types): return container_or_id try: return cast("ContainerProxy", container_or_id).id except AttributeError: pass return cast("Dict[str, str]", container_or_id)["id"] def _get_container_link(self, container_or_id): # type: (Union[str, ContainerProxy, Dict[str, Any]]) -> str return u"{}/colls/{}".format(self.database_link, self._get_container_id(container_or_id)) def _get_user_link(self, user_or_id): # type: (Union[UserProxy, str, Dict[str, Any]]) -> str if isinstance(user_or_id, six.string_types): return u"{}/users/{}".format(self.database_link, user_or_id) try: return cast("UserProxy", user_or_id).user_link except AttributeError: pass return u"{}/users/{}".format(self.database_link, cast("Dict[str, str]", user_or_id)["id"]) def _get_properties(self): # type: () -> Dict[str, Any] if self._properties is None: self._properties = self.read() return self._properties
[docs] @distributed_trace def read(self, populate_query_metrics=None, **kwargs): # type: (Optional[bool], Any) -> Dict[str, Any] """Read the database properties. :param bool populate_query_metrics: Enable returning query metrics in response headers. :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. :rtype: Dict[Str, Any] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given database couldn't be retrieved. """ # TODO this helper function should be extracted from CosmosClient from .cosmos_client import CosmosClient database_link = CosmosClient._get_database_link(self) request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) if populate_query_metrics is not None: request_options["populateQueryMetrics"] = populate_query_metrics self._properties = self.client_connection.ReadDatabase( database_link, options=request_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, self._properties) return cast('Dict[str, Any]', self._properties)
[docs] @distributed_trace def create_container( self, id, # type: str # pylint: disable=redefined-builtin partition_key, # type: Any indexing_policy=None, # type: Optional[Dict[str, Any]] default_ttl=None, # type: Optional[int] populate_query_metrics=None, # type: Optional[bool] offer_throughput=None, # type: Optional[int] unique_key_policy=None, # type: Optional[Dict[str, Any]] conflict_resolution_policy=None, # type: Optional[Dict[str, Any]] **kwargs # type: Any ): # type: (...) -> ContainerProxy """Create a new container with the given ID (name). If a container with the given ID already exists, a CosmosResourceExistsError is raised. :param id: ID (name) of container to create. :param partition_key: The partition key to use for the container. :param indexing_policy: The indexing policy to apply to the container. :param default_ttl: Default time to live (TTL) for items in the container. If unspecified, items do not expire. :param populate_query_metrics: Enable returning query metrics in response headers. :param offer_throughput: The provisioned throughput for this offer. :param unique_key_policy: The unique key policy to apply to the container. :param conflict_resolution_policy: The conflict resolution policy to apply to the container. :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource has changed, and act according to the condition specified by the `match_condition` parameter. :keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: A `ContainerProxy` instance representing the new container. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container creation failed. :rtype: ~azure.cosmos.ContainerProxy .. admonition:: Example: .. literalinclude:: ../samples/examples.py :start-after: [START create_container] :end-before: [END create_container] :language: python :dedent: 0 :caption: Create a container with default settings: :name: create_container .. literalinclude:: ../samples/examples.py :start-after: [START create_container_with_settings] :end-before: [END create_container_with_settings] :language: python :dedent: 0 :caption: Create a container with specific settings; in this case, a custom partition key: :name: create_container_with_settings """ definition = dict(id=id) # type: Dict[str, Any] if partition_key is not None: definition["partitionKey"] = partition_key if indexing_policy is not None: definition["indexingPolicy"] = indexing_policy if default_ttl is not None: definition["defaultTtl"] = default_ttl if unique_key_policy is not None: definition["uniqueKeyPolicy"] = unique_key_policy if conflict_resolution_policy is not None: definition["conflictResolutionPolicy"] = conflict_resolution_policy request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) if populate_query_metrics is not None: request_options["populateQueryMetrics"] = populate_query_metrics if offer_throughput is not None: request_options["offerThroughput"] = offer_throughput data = self.client_connection.CreateContainer( database_link=self.database_link, collection=definition, options=request_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, data) return ContainerProxy(self.client_connection, self.database_link, data["id"], properties=data)
[docs] @distributed_trace def create_container_if_not_exists( self, id, # type: str # pylint: disable=redefined-builtin partition_key, # type: Any indexing_policy=None, # type: Optional[Dict[str, Any]] default_ttl=None, # type: Optional[int] populate_query_metrics=None, # type: Optional[bool] offer_throughput=None, # type: Optional[int] unique_key_policy=None, # type: Optional[Dict[str, Any]] conflict_resolution_policy=None, # type: Optional[Dict[str, Any]] **kwargs # type: Any ): # type: (...) -> ContainerProxy """Create a container if it does not exist already. If the container already exists, the existing settings are returned. Note: it does not check or update the existing container settings or offer throughput if they differ from what was passed into the method. :param id: ID (name) of container to read or create. :param partition_key: The partition key to use for the container. :param indexing_policy: The indexing policy to apply to the container. :param default_ttl: Default time to live (TTL) for items in the container. If unspecified, items do not expire. :param populate_query_metrics: Enable returning query metrics in response headers. :param offer_throughput: The provisioned throughput for this offer. :param unique_key_policy: The unique key policy to apply to the container. :param conflict_resolution_policy: The conflict resolution policy to apply to the container. :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource has changed, and act according to the condition specified by the `match_condition` parameter. :keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: A `ContainerProxy` instance representing the container. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container read or creation failed. :rtype: ~azure.cosmos.ContainerProxy """ try: container_proxy = self.get_container_client(id) container_proxy.read( populate_query_metrics=populate_query_metrics, **kwargs ) return container_proxy except CosmosResourceNotFoundError: return self.create_container( id=id, partition_key=partition_key, indexing_policy=indexing_policy, default_ttl=default_ttl, populate_query_metrics=populate_query_metrics, offer_throughput=offer_throughput, unique_key_policy=unique_key_policy, conflict_resolution_policy=conflict_resolution_policy )
[docs] @distributed_trace def delete_container( self, container, # type: Union[str, ContainerProxy, Dict[str, Any]] populate_query_metrics=None, # type: Optional[bool] **kwargs # type: Any ): # type: (...) -> None """Delete a container. :param container: The ID (name) of the container to delete. You can either pass in the ID of the container to delete, a :class:`ContainerProxy` instance or a dict representing the properties of the container. :param populate_query_metrics: Enable returning query metrics in response headers. :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource has changed, and act according to the condition specified by the `match_condition` parameter. :keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag. :keyword Callable response_hook: A callable invoked with the response metadata. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the container couldn't be deleted. :rtype: None """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) if populate_query_metrics is not None: request_options["populateQueryMetrics"] = populate_query_metrics collection_link = self._get_container_link(container) result = self.client_connection.DeleteContainer(collection_link, options=request_options, **kwargs) if response_hook: response_hook(self.client_connection.last_response_headers, result)
[docs] def get_container_client(self, container): # type: (Union[str, ContainerProxy, Dict[str, Any]]) -> ContainerProxy """Get a `ContainerProxy` for a container with specified ID (name). :param container: The ID (name) of the container, a :class:`ContainerProxy` instance, or a dict representing the properties of the container to be retrieved. :rtype: ~azure.cosmos.ContainerProxy .. admonition:: Example: .. literalinclude:: ../samples/examples.py :start-after: [START get_container] :end-before: [END get_container] :language: python :dedent: 0 :caption: Get an existing container, handling a failure if encountered: :name: get_container """ if isinstance(container, ContainerProxy): id_value = container.id elif isinstance(container, Mapping): id_value = container["id"] else: id_value = container return ContainerProxy(self.client_connection, self.database_link, id_value)
[docs] @distributed_trace def list_containers(self, max_item_count=None, populate_query_metrics=None, **kwargs): # type: (Optional[int], Optional[bool], Any) -> Iterable[Dict[str, Any]] """List the containers in the database. :param max_item_count: Max number of items to be returned in the enumeration operation. :param populate_query_metrics: Enable returning query metrics in response headers. :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: An Iterable of container properties (dicts). :rtype: Iterable[dict[str, Any]] .. admonition:: Example: .. literalinclude:: ../samples/examples.py :start-after: [START list_containers] :end-before: [END list_containers] :language: python :dedent: 0 :caption: List all containers in the database: :name: list_containers """ feed_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count if populate_query_metrics is not None: feed_options["populateQueryMetrics"] = populate_query_metrics result = self.client_connection.ReadContainers( database_link=self.database_link, options=feed_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, result) return result
[docs] @distributed_trace def query_containers( self, query=None, # type: Optional[str] parameters=None, # type: Optional[List[str]] max_item_count=None, # type: Optional[int] populate_query_metrics=None, # type: Optional[bool] **kwargs # type: Any ): # type: (...) -> Iterable[Dict[str, Any]] """List the properties for containers in the current database. :param query: The Azure Cosmos DB SQL query to execute. :param parameters: Optional array of parameters to the query. Ignored if no query is provided. :param max_item_count: Max number of items to be returned in the enumeration operation. :param populate_query_metrics: Enable returning query metrics in response headers. :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: An Iterable of container properties (dicts). :rtype: Iterable[dict[str, Any]] """ feed_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count if populate_query_metrics is not None: feed_options["populateQueryMetrics"] = populate_query_metrics result = self.client_connection.QueryContainers( database_link=self.database_link, query=query if parameters is None else dict(query=query, parameters=parameters), options=feed_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, result) return result
[docs] @distributed_trace def replace_container( self, container, # type: Union[str, ContainerProxy, Dict[str, Any]] partition_key, # type: Any indexing_policy=None, # type: Optional[Dict[str, Any]] default_ttl=None, # type: Optional[int] conflict_resolution_policy=None, # type: Optional[Dict[str, Any]] populate_query_metrics=None, # type: Optional[bool] **kwargs # type: Any ): # type: (...) -> ContainerProxy """Reset the properties of the container. Property changes are persisted immediately. Any properties not specified will be reset to their default values. :param container: The ID (name), dict representing the properties or :class:`ContainerProxy` instance of the container to be replaced. :param partition_key: The partition key to use for the container. :param indexing_policy: The indexing policy to apply to the container. :param default_ttl: Default time to live (TTL) for items in the container. If unspecified, items do not expire. :param conflict_resolution_policy: The conflict resolution policy to apply to the container. :param populate_query_metrics: Enable returning query metrics in response headers. :keyword str session_token: Token for use with Session consistency. :keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource has changed, and act according to the condition specified by the `match_condition` parameter. :keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be replaced. This includes if the container with given id does not exist. :returns: A `ContainerProxy` instance representing the container after replace completed. :rtype: ~azure.cosmos.ContainerProxy .. admonition:: Example: .. literalinclude:: ../samples/examples.py :start-after: [START reset_container_properties] :end-before: [END reset_container_properties] :language: python :dedent: 0 :caption: Reset the TTL property on a container, and display the updated properties: :name: reset_container_properties """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) if populate_query_metrics is not None: request_options["populateQueryMetrics"] = populate_query_metrics container_id = self._get_container_id(container) container_link = self._get_container_link(container_id) parameters = { key: value for key, value in { "id": container_id, "partitionKey": partition_key, "indexingPolicy": indexing_policy, "defaultTtl": default_ttl, "conflictResolutionPolicy": conflict_resolution_policy, }.items() if value is not None } container_properties = self.client_connection.ReplaceContainer( container_link, collection=parameters, options=request_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, container_properties) return ContainerProxy( self.client_connection, self.database_link, container_properties["id"], properties=container_properties )
[docs] @distributed_trace def list_users(self, max_item_count=None, **kwargs): # type: (Optional[int], Any) -> Iterable[Dict[str, Any]] """List all the users in the container. :param max_item_count: Max number of users to be returned in the enumeration operation. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: An Iterable of user properties (dicts). :rtype: Iterable[dict[str, Any]] """ feed_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count result = self.client_connection.ReadUsers( database_link=self.database_link, options=feed_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, result) return result
[docs] @distributed_trace def query_users(self, query, parameters=None, max_item_count=None, **kwargs): # type: (str, Optional[List[str]], Optional[int], Any) -> Iterable[Dict[str, Any]] """Return all users matching the given `query`. :param query: The Azure Cosmos DB SQL query to execute. :param parameters: Optional array of parameters to the query. Ignored if no query is provided. :param max_item_count: Max number of users to be returned in the enumeration operation. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: An Iterable of user properties (dicts). :rtype: Iterable[str, Any] """ feed_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count result = self.client_connection.QueryUsers( database_link=self.database_link, query=query if parameters is None else dict(query=query, parameters=parameters), options=feed_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, result) return result
[docs] def get_user_client(self, user): # type: (Union[str, UserProxy, Dict[str, Any]]) -> UserProxy """Get a `UserProxy` for a user with specified ID. :param user: The ID (name), dict representing the properties or :class:`UserProxy` instance of the user to be retrieved. :returns: A `UserProxy` instance representing the retrieved user. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given user couldn't be retrieved. :rtype: ~azure.cosmos.UserProxy """ if isinstance(user, UserProxy): id_value = user.id elif isinstance(user, Mapping): id_value = user["id"] else: id_value = user return UserProxy(client_connection=self.client_connection, id=id_value, database_link=self.database_link)
[docs] @distributed_trace def create_user(self, body, **kwargs): # type: (Dict[str, Any], Any) -> UserProxy """Create a new user in the container. To update or replace an existing user, use the :func:`ContainerProxy.upsert_user` method. :param body: A dict-like object with an `id` key and value representing the user to be created. The user ID must be unique within the database, and consist of no more than 255 characters. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: A `UserProxy` instance representing the new user. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given user couldn't be created. :rtype: ~azure.cosmos.UserProxy .. admonition:: Example: .. literalinclude:: ../samples/examples.py :start-after: [START create_user] :end-before: [END create_user] :language: python :dedent: 0 :caption: Create a database user: :name: create_user """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) user = self.client_connection.CreateUser( database_link=self.database_link, user=body, options=request_options, **kwargs) if response_hook: response_hook(self.client_connection.last_response_headers, user) return UserProxy( client_connection=self.client_connection, id=user["id"], database_link=self.database_link, properties=user )
[docs] @distributed_trace def upsert_user(self, body, **kwargs): # type: (Dict[str, Any], Any) -> UserProxy """Insert or update the specified user. If the user already exists in the container, it is replaced. If the user does not already exist, it is inserted. :param body: A dict-like object representing the user to update or insert. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: A `UserProxy` instance representing the upserted user. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given user could not be upserted. :rtype: ~azure.cosmos.UserProxy """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) user = self.client_connection.UpsertUser( database_link=self.database_link, user=body, options=request_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, user) return UserProxy( client_connection=self.client_connection, id=user["id"], database_link=self.database_link, properties=user )
[docs] @distributed_trace def replace_user( self, user, # type: Union[str, UserProxy, Dict[str, Any]] body, # type: Dict[str, Any] **kwargs # type: Any ): # type: (...) -> UserProxy """Replaces the specified user if it exists in the container. :param user: The ID (name), dict representing the properties or :class:`UserProxy` instance of the user to be replaced. :param body: A dict-like object representing the user to replace. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: A `UserProxy` instance representing the user after replace went through. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the replace failed or the user with given ID does not exist. :rtype: ~azure.cosmos.UserProxy """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) replaced_user = self.client_connection.ReplaceUser( user_link=self._get_user_link(user), user=body, options=request_options, **kwargs ) # type: Dict[str, str] if response_hook: response_hook(self.client_connection.last_response_headers, replaced_user) return UserProxy( client_connection=self.client_connection, id=replaced_user["id"], database_link=self.database_link, properties=replaced_user )
[docs] @distributed_trace def delete_user(self, user, **kwargs): # type: (Union[str, UserProxy, Dict[str, Any]], Any) -> None """Delete the specified user from the container. :param user: The ID (name), dict representing the properties or :class:`UserProxy` instance of the user to be deleted. :keyword Callable response_hook: A callable invoked with the response metadata. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The user wasn't deleted successfully. :raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The user does not exist in the container. :rtype: None """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) result = self.client_connection.DeleteUser( user_link=self._get_user_link(user), options=request_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, result)
[docs] @distributed_trace def read_offer(self, **kwargs): # type: (Any) -> Offer """Read the Offer object for this database. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: Offer for the database. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If no offer exists for the database or if the offer could not be retrieved. :rtype: ~azure.cosmos.Offer """ response_hook = kwargs.pop('response_hook', None) properties = self._get_properties() link = properties["_self"] query_spec = { "query": "SELECT * FROM root r WHERE r.resource=@link", "parameters": [{"name": "@link", "value": link}], } offers = list(self.client_connection.QueryOffers(query_spec, **kwargs)) if not offers: raise CosmosResourceNotFoundError( status_code=StatusCodes.NOT_FOUND, message="Could not find Offer for database " + self.database_link) if response_hook: response_hook(self.client_connection.last_response_headers, offers) return Offer(offer_throughput=offers[0]["content"]["offerThroughput"], properties=offers[0])
[docs] @distributed_trace def replace_throughput(self, throughput, **kwargs): # type: (Optional[int], Any) -> Offer """Replace the database-level throughput. :param throughput: The throughput to be set (an integer). :keyword Callable response_hook: A callable invoked with the response metadata. :returns: Offer for the database, updated with new throughput. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If no offer exists for the database or if the offer could not be updated. :rtype: ~azure.cosmos.Offer """ response_hook = kwargs.pop('response_hook', None) properties = self._get_properties() link = properties["_self"] query_spec = { "query": "SELECT * FROM root r WHERE r.resource=@link", "parameters": [{"name": "@link", "value": link}], } offers = list(self.client_connection.QueryOffers(query_spec)) if not offers: raise CosmosResourceNotFoundError( status_code=StatusCodes.NOT_FOUND, message="Could not find Offer for collection " + self.database_link) new_offer = offers[0].copy() new_offer["content"]["offerThroughput"] = throughput data = self.client_connection.ReplaceOffer(offer_link=offers[0]["_self"], offer=offers[0], **kwargs) if response_hook: response_hook(self.client_connection.last_response_headers, data) return Offer(offer_throughput=data["content"]["offerThroughput"], properties=data)