# The MIT License (MIT)
# Copyright (c) 2021 Microsoft Corporation
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Interact with databases in the Azure Cosmos DB SQL API service.
"""
from typing import Any, Dict, List, Mapping, Optional, Union
import warnings
from azure.core import MatchConditions
from azure.core.async_paging import AsyncItemPaged
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.tracing.decorator import distributed_trace
from ._cosmos_client_connection_async import CosmosClientConnection
from .._base import build_options as _build_options, _set_throughput_options, _deserialize_throughput, \
_replace_throughput
from ._container import ContainerProxy
from ..offer import ThroughputProperties
from ..http_constants import StatusCodes
from ..exceptions import CosmosResourceNotFoundError
from ._user import UserProxy
from ..documents import IndexingMode
from ..partition_key import PartitionKey
__all__ = ("DatabaseProxy",)
# pylint: disable=protected-access
# pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs
def _get_database_link(database_or_id: Union[str, 'DatabaseProxy', Mapping[str, Any]]) -> str:
if isinstance(database_or_id, str):
return "dbs/{}".format(database_or_id)
if isinstance(database_or_id, DatabaseProxy):
return database_or_id.database_link
database_id = database_or_id["id"]
return "dbs/{}".format(database_id)
[docs]
class DatabaseProxy(object):
"""An interface to interact with a specific database.
This class should not be instantiated directly. Instead use the
:func:`~azure.cosmos.aio.CosmosClient.get_database_client` method to get an existing
database, or the :func:`~azure.cosmos.aio.CosmosClient.create_database` method to create
a new database.
A database contains one or more containers, each of which can contain items,
stored procedures, triggers, and user-defined functions.
A database can also have associated users, each of which is configured with
a set of permissions for accessing certain containers, stored procedures,
triggers, user-defined functions, or items.
:ivar id: The ID (name) of the database.
An Azure Cosmos DB SQL API database has the following system-generated
properties. These properties are read-only:
* `_rid`: The resource ID.
* `_ts`: When the resource was last updated. The value is a timestamp.
* `_self`: The unique addressable URI for the resource.
* `_etag`: The resource etag required for optimistic concurrency control.
* `_colls`: The addressable path of the collections resource.
* `_users`: The addressable path of the users resource.
"""
def __init__(
self,
client_connection: CosmosClientConnection,
id: str,
properties: Optional[Dict[str, Any]] = None
) -> None:
"""
:param client_connection: Client from which this database was retrieved.
:type client_connection: ~azure.cosmos.aio.CosmosClientConnection
:param str id: ID (name) of the database.
"""
self.client_connection = client_connection
self.id = id
self.database_link = "dbs/{}".format(self.id)
self._properties = properties
def __repr__(self) -> str:
return "<DatabaseProxy [{}]>".format(self.database_link)[:1024]
def _get_container_id(self, container_or_id: Union[str, ContainerProxy, Mapping[str, Any]]) -> str:
if isinstance(container_or_id, str):
return container_or_id
if isinstance(container_or_id, ContainerProxy):
return container_or_id.id
return str(container_or_id["id"])
def _get_container_link(self, container_or_id: Union[str, ContainerProxy, Mapping[str, Any]]) -> str:
return "{}/colls/{}".format(self.database_link, self._get_container_id(container_or_id))
def _get_user_link(self, user_or_id: Union[UserProxy, str, Mapping[str, Any]]) -> str:
if isinstance(user_or_id, str):
return "{}/users/{}".format(self.database_link, user_or_id)
if isinstance(user_or_id, UserProxy):
return user_or_id.user_link
return "{}/users/{}".format(self.database_link, user_or_id["id"])
async def _get_properties(self) -> Dict[str, Any]:
if self._properties is None:
self._properties = await self.read()
return self._properties
[docs]
@distributed_trace_async
async def read(
self,
*,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> Dict[str, Any]:
"""Read the database properties.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given database couldn't be retrieved.
:returns: A dict representing the database properties
:rtype: Dict[str, Any]
"""
database_link = _get_database_link(self)
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
request_options = _build_options(kwargs)
self._properties = await self.client_connection.ReadDatabase(
database_link, options=request_options, **kwargs
)
return self._properties
[docs]
@distributed_trace_async
async def create_container(
self,
id: str,
partition_key: PartitionKey,
*,
indexing_policy: Optional[Dict[str, str]] = None,
default_ttl: Optional[int] = None,
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
unique_key_policy: Optional[Dict[str, str]] = None,
conflict_resolution_policy: Optional[Dict[str, str]] = None,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
etag: Optional[str] = None,
match_condition: Optional[MatchConditions] = None,
analytical_storage_ttl: Optional[int] = None,
vector_embedding_policy: Optional[Dict[str, Any]] = None,
full_text_policy: Optional[Dict[str, Any]] = None,
**kwargs: Any
) -> ContainerProxy:
"""Create a new container with the given ID (name).
If a container with the given ID already exists, a CosmosResourceExistsError is raised.
:param str id: ID (name) of container to create.
:param partition_key: The partition key to use for the container.
:type partition_key: ~azure.cosmos.PartitionKey
:keyword dict[str, str] indexing_policy: The indexing policy to apply to the container.
:keyword int default_ttl: Default time to live (TTL) for items in the container.
If unspecified, items do not expire.
:keyword offer_throughput: The provisioned throughput for this offer.
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
:keyword dict[str, str] unique_key_policy: The unique key policy to apply to the container.
:keyword dict[str, str] conflict_resolution_policy: The conflict resolution policy to apply to the container.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource
has changed, and act according to the condition specified by the `match_condition` parameter.
:keyword match_condition: The match condition to use upon the etag.
:paramtype match_condition: ~azure.core.MatchConditions
:keyword List[Dict[str, str]] computed_properties: **provisional** Sets The computed properties for this
container in the Azure Cosmos DB Service. For more Information on how to use computed properties visit
`here: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/computed-properties?tabs=dotnet`
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:keyword int analytical_storage_ttl: Analytical store time to live (TTL) for items in the container. A value of
None leaves analytical storage off and a value of -1 turns analytical storage on with no TTL. Please
note that analytical storage can only be enabled on Synapse Link enabled accounts.
:keyword Dict[str, Any] vector_embedding_policy: The vector embedding policy for the container. Each vector
embedding possesses a predetermined number of dimensions, is associated with an underlying data type, and
is generated for a particular distance function.
:keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container.
Used to denote the default language to be used for all full text indexes, or to individually
assign a language to each full text index path.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container creation failed.
:returns: A `ContainerProxy` instance representing the new container.
:rtype: ~azure.cosmos.aio.ContainerProxy
.. admonition:: Example:
.. literalinclude:: ../samples/examples_async.py
:start-after: [START create_container]
:end-before: [END create_container]
:language: python
:dedent: 0
:caption: Create a container with default settings:
:name: create_container
.. literalinclude:: ../samples/examples_async.py
:start-after: [START create_container_with_settings]
:end-before: [END create_container_with_settings]
:language: python
:dedent: 0
:caption: Create a container with specific settings; in this case, a custom partition key:
:name: create_container_with_settings
"""
definition: Dict[str, Any] = {"id": id}
if partition_key is not None:
definition["partitionKey"] = partition_key
if indexing_policy is not None:
if indexing_policy.get("indexingMode") is IndexingMode.Lazy:
warnings.warn(
"Lazy indexing mode has been deprecated. Mode will be set to consistent indexing by the backend.",
DeprecationWarning
)
definition["indexingPolicy"] = indexing_policy
if default_ttl is not None:
definition["defaultTtl"] = default_ttl
if unique_key_policy is not None:
definition["uniqueKeyPolicy"] = unique_key_policy
if conflict_resolution_policy is not None:
definition["conflictResolutionPolicy"] = conflict_resolution_policy
if analytical_storage_ttl is not None:
definition["analyticalStorageTtl"] = analytical_storage_ttl
computed_properties = kwargs.pop('computed_properties', None)
if computed_properties is not None:
definition["computedProperties"] = computed_properties
if vector_embedding_policy is not None:
definition["vectorEmbeddingPolicy"] = vector_embedding_policy
if full_text_policy is not None:
definition["fullTextPolicy"] = full_text_policy
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if etag is not None:
kwargs['etag'] = etag
if match_condition is not None:
kwargs['match_condition'] = match_condition
request_options = _build_options(kwargs)
_set_throughput_options(offer=offer_throughput, request_options=request_options)
data = await self.client_connection.CreateContainer(
database_link=self.database_link, collection=definition, options=request_options, **kwargs
)
return ContainerProxy(self.client_connection, self.database_link, data["id"], properties=data)
[docs]
@distributed_trace_async
async def create_container_if_not_exists(
self,
id: str,
partition_key: PartitionKey,
*,
indexing_policy: Optional[Dict[str, str]] = None,
default_ttl: Optional[int] = None,
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
unique_key_policy: Optional[Dict[str, str]] = None,
conflict_resolution_policy: Optional[Dict[str, str]] = None,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
etag: Optional[str] = None,
match_condition: Optional[MatchConditions] = None,
analytical_storage_ttl: Optional[int] = None,
vector_embedding_policy: Optional[Dict[str, Any]] = None,
full_text_policy: Optional[Dict[str, Any]] = None,
**kwargs: Any
) -> ContainerProxy:
"""Create a container if it does not exist already.
If the container already exists, the existing settings are returned.
Note: it does not check or update the existing container settings or offer throughput
if they differ from what was passed into the method.
:param str id: ID (name) of container to create.
:param partition_key: The partition key to use for the container.
:type partition_key: ~azure.cosmos.PartitionKey
:keyword dict[str, str] indexing_policy: The indexing policy to apply to the container.
:keyword int default_ttl: Default time to live (TTL) for items in the container.
If unspecified, items do not expire.
:keyword offer_throughput: The provisioned throughput for this offer.
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
:keyword dict[str, str] unique_key_policy: The unique key policy to apply to the container.
:keyword dict[str, str] conflict_resolution_policy: The conflict resolution policy to apply to the container.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource
has changed, and act according to the condition specified by the `match_condition` parameter.
:keyword match_condition: The match condition to use upon the etag.
:paramtype match_condition: ~azure.core.MatchConditions
:keyword List[Dict[str, str]] computed_properties: **provisional** Sets The computed properties for this
container in the Azure Cosmos DB Service. For more Information on how to use computed properties visit
`here: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/computed-properties?tabs=dotnet`
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:keyword int analytical_storage_ttl: Analytical store time to live (TTL) for items in the container. A value of
None leaves analytical storage off and a value of -1 turns analytical storage on with no TTL. Please
note that analytical storage can only be enabled on Synapse Link enabled accounts.
:keyword Dict[str, Any] vector_embedding_policy: **provisional** The vector embedding policy for the container.
Each vector embedding possesses a predetermined number of dimensions, is associated with an underlying
data type, and is generated for a particular distance function.
:keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container.
Used to denote the default language to be used for all full text indexes, or to individually
assign a language to each full text index path.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container creation failed.
:returns: A `ContainerProxy` instance representing the new container.
:rtype: ~azure.cosmos.aio.ContainerProxy
"""
computed_properties = kwargs.pop("computed_properties", None)
try:
container_proxy = self.get_container_client(id)
await container_proxy.read(
session_token=session_token,
initial_headers=initial_headers,
**kwargs
)
return container_proxy
except CosmosResourceNotFoundError:
return await self.create_container(
id=id,
partition_key=partition_key,
indexing_policy=indexing_policy,
default_ttl=default_ttl,
offer_throughput=offer_throughput,
unique_key_policy=unique_key_policy,
conflict_resolution_policy=conflict_resolution_policy,
analytical_storage_ttl=analytical_storage_ttl,
computed_properties=computed_properties,
etag=etag,
match_condition=match_condition,
session_token=session_token,
initial_headers=initial_headers,
vector_embedding_policy=vector_embedding_policy,
full_text_policy=full_text_policy,
**kwargs
)
[docs]
def get_container_client(self, container: Union[str, ContainerProxy, Dict[str, Any]]) -> ContainerProxy:
"""Get a `ContainerProxy` for a container with specified ID (name).
:param container: The ID (name), dict representing the properties, or :class:`ContainerProxy`
instance of the container to get.
:type container: Union[str, Dict[str, Any], ~azure.cosmos.aio.ContainerProxy]
:returns: A `ContainerProxy` instance representing the container.
:rtype: ~azure.cosmos.aio.ContainerProxy
.. admonition:: Example:
.. literalinclude:: ../samples/examples_async.py
:start-after: [START get_container]
:end-before: [END get_container]
:language: python
:dedent: 0
:caption: Get an existing container, handling a failure if encountered:
:name: get_container
"""
if isinstance(container, str):
id_value = container
elif isinstance(container, ContainerProxy):
id_value = container.id
else:
id_value = str(container['id'])
return ContainerProxy(self.client_connection, self.database_link, id_value)
[docs]
@distributed_trace
def list_containers(
self,
*,
session_token: Optional[str] = None,
max_item_count: Optional[int] = None,
initial_headers: Optional[Dict[str, str]] = None,
**kwargs
) -> AsyncItemPaged[Dict[str, Any]]:
"""List the containers in the database.
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], AsyncItemPaged[Dict[str, Any]]], None]
:returns: An AsyncItemPaged of container properties (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
.. admonition:: Example:
.. literalinclude:: ../samples/examples_async.py
:start-after: [START list_containers]
:end-before: [END list_containers]
:language: python
:dedent: 0
:caption: List all containers in the database:
:name: list_containers
"""
response_hook = kwargs.pop('response_hook', None)
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
feed_options = _build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
result = self.client_connection.ReadContainers(
database_link=self.database_link, options=feed_options, **kwargs
)
if response_hook:
response_hook(self.client_connection.last_response_headers, result)
return result
[docs]
@distributed_trace
def query_containers(
self,
query: str,
*,
parameters: Optional[List[Dict[str, Any]]] = None,
session_token: Optional[str] = None,
max_item_count: Optional[int] = None,
initial_headers: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""List the properties for containers in the current database.
:param str query: The Azure Cosmos DB SQL query to execute.
:keyword parameters: Optional array of parameters to the query.
Each parameter is a dict() with 'name' and 'value' keys.
:paramtype parameters: Optional[List[Dict[str, Any]]]
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], AsyncItemPaged[Dict[str, Any]]], None]
:returns: An AsyncItemPaged of container properties (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
response_hook = kwargs.pop('response_hook', None)
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
feed_options = _build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
result = self.client_connection.QueryContainers(
database_link=self.database_link,
query=query if parameters is None else {"query": query, "parameters": parameters},
options=feed_options,
**kwargs
)
if response_hook:
response_hook(self.client_connection.last_response_headers, result)
return result
[docs]
@distributed_trace_async
async def replace_container(
self,
container: Union[str, ContainerProxy, Mapping[str, Any]],
partition_key: PartitionKey,
*,
indexing_policy: Optional[Dict[str, str]] = None,
default_ttl: Optional[int] = None,
conflict_resolution_policy: Optional[Dict[str, str]] = None,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
etag: Optional[str] = None,
match_condition: Optional[MatchConditions] = None,
analytical_storage_ttl: Optional[int] = None,
full_text_policy: Optional[Dict[str, Any]] = None,
**kwargs: Any
) -> ContainerProxy:
"""Reset the properties of the container.
Property changes are persisted immediately. Any properties not specified
will be reset to their default values.
:param container: The ID (name), dict representing the properties or
:class:`ContainerProxy` instance of the container to be replaced.
:type container: Union[str, Dict[str, Any], ~azure.cosmos.aio.ContainerProxy]
:param partition_key: The partition key to use for the container.
:type partition_key: ~azure.cosmos.PartitionKey
:keyword dict[str, str] indexing_policy: The indexing policy to apply to the container.
:keyword int default_ttl: Default time to live (TTL) for items in the container.
If unspecified, items do not expire.
:keyword dict[str, str] conflict_resolution_policy: The conflict resolution policy to apply to the container.
:keyword str session_token: Token for use with Session consistency.
:keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource
has changed, and act according to the condition specified by the `match_condition` parameter.
:keyword match_condition: The match condition to use upon the etag.
:paramtype match_condition: ~azure.core.MatchConditions
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword int analytical_storage_ttl: Analytical store time to live (TTL) for items in the container. A value of
None leaves analytical storage off and a value of -1 turns analytical storage on with no TTL. Please
note that analytical storage can only be enabled on Synapse Link enabled accounts.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container.
Used to denote the default language to be used for all full text indexes, or to individually
assign a language to each full text index path.
:returns: A `ContainerProxy` instance representing the container after replace completed.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be replaced.
This includes if the container with given id does not exist.
:rtype: ~azure.cosmos.aio.ContainerProxy
.. admonition:: Example:
.. literalinclude:: ../samples/examples_async.py
:start-after: [START reset_container_properties]
:end-before: [END reset_container_properties]
:language: python
:dedent: 0
:caption: Reset the TTL property on a container, and display the updated properties:
:name: reset_container_properties
"""
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if etag is not None:
kwargs['etag'] = etag
if match_condition is not None:
kwargs['match_condition'] = match_condition
request_options = _build_options(kwargs)
container_id = self._get_container_id(container)
container_link = self._get_container_link(container_id)
parameters = {
key: value
for key, value in {
"id": container_id,
"partitionKey": partition_key,
"indexingPolicy": indexing_policy,
"defaultTtl": default_ttl,
"conflictResolutionPolicy": conflict_resolution_policy,
"analyticalStorageTtl": analytical_storage_ttl,
"fullTextPolicy": full_text_policy
}.items()
if value is not None
}
container_properties = await self.client_connection.ReplaceContainer(
container_link, collection=parameters, options=request_options, **kwargs
)
return ContainerProxy(
self.client_connection, self.database_link, container_properties["id"], properties=container_properties
)
[docs]
@distributed_trace_async
async def delete_container(
self,
container: Union[str, ContainerProxy, Mapping[str, Any]],
*,
session_token: Optional[str] = None,
initial_headers: Optional[Dict[str, str]] = None,
etag: Optional[str] = None,
match_condition: Optional[MatchConditions] = None,
**kwargs: Any
) -> None:
"""Delete a container.
:param container: The ID (name) of the container to delete. You can either
pass in the ID of the container to delete, a :class:`ContainerProxy` instance or
a dict representing the properties of the container.
:type container: str or Dict[str, Any] or ~azure.cosmos.aio.ContainerProxy
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword str etag: An ETag value, or the wildcard character (*). Used to check if the resource
has changed, and act according to the condition specified by the `match_condition` parameter.
:keyword match_condition: The match condition to use upon the etag.
:paramtype match_condition: ~azure.core.MatchConditions
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], None], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the container couldn't be deleted.
:rtype: None
"""
if session_token is not None:
kwargs['session_token'] = session_token
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if etag is not None:
kwargs['etag'] = etag
if match_condition is not None:
kwargs['match_condition'] = match_condition
request_options = _build_options(kwargs)
collection_link = self._get_container_link(container)
await self.client_connection.DeleteContainer(collection_link, options=request_options, **kwargs)
[docs]
@distributed_trace_async
async def create_user(
self,
body: Dict[str, Any],
**kwargs: Any
) -> UserProxy: # body should just be id?
"""Create a new user in the container.
To update or replace an existing user, use the
:func:`ContainerProxy.upsert_user` method.
:param Dict[str, Any] body: A dict object with an `id` key and value representing the user to be created.
The user ID must be unique within the database, and consist of no more than 255 characters.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given user couldn't be created.
:returns: A `UserProxy` instance representing the new user.
:rtype: ~azure.cosmos.aio.UserProxy
.. admonition:: Example:
.. literalinclude:: ../samples/examples_async.py
:start-after: [START create_user]
:end-before: [END create_user]
:language: python
:dedent: 0
:caption: Create a database user:
:name: create_user
"""
request_options = _build_options(kwargs)
user = await self.client_connection.CreateUser(
database_link=self.database_link, user=body, options=request_options, **kwargs)
return UserProxy(
client_connection=self.client_connection, id=user["id"], database_link=self.database_link, properties=user
)
[docs]
def get_user_client(
self,
user: Union[str, UserProxy, Mapping[str, Any]]
) -> UserProxy:
"""Get a `UserProxy` for a user with specified ID.
:param user: The ID (name), dict representing the properties, or :class:`UserProxy`
instance of the user to get.
:type user: Union[str, Dict[str, Any], ~azure.cosmos.aio.UserProxy]
:returns: A `UserProxy` instance representing the retrieved user.
:rtype: ~azure.cosmos.aio.UserProxy
"""
if isinstance(user, str):
id_value = user
elif isinstance(user, UserProxy):
id_value = user.id
else:
id_value = str(user['id'])
return UserProxy(client_connection=self.client_connection, id=id_value, database_link=self.database_link)
[docs]
@distributed_trace
def list_users(
self,
*,
max_item_count: Optional[int] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""List all the users in the container.
:keyword int max_item_count: Max number of users to be returned in the enumeration operation.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], AsyncItemPaged[Dict[str, Any]]], None]
:returns: An AsyncItemPaged of user properties (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
feed_options = _build_options(kwargs)
response_hook = kwargs.pop('response_hook', None)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
result = self.client_connection.ReadUsers(
database_link=self.database_link, options=feed_options, **kwargs
)
if response_hook:
response_hook(self.client_connection.last_response_headers, result)
return result
[docs]
@distributed_trace
def query_users(
self,
query: str,
*,
parameters: Optional[List[Dict[str, Any]]] = None,
max_item_count: Optional[int] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Return all users matching the given `query`.
:param str query: The Azure Cosmos DB SQL query to execute.
:keyword parameters: Optional array of parameters to the query.
Each parameter is a dict() with 'name' and 'value' keys.
Ignored if no query is provided.
:paramtype parameters: Optional[List[Dict[str, Any]]]
:keyword int max_item_count: Max number of users to be returned in the enumeration operation.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], AsyncItemPaged[Dict[str, Any]]], None]
:returns: An AsyncItemPaged of user properties (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
feed_options = _build_options(kwargs)
response_hook = kwargs.pop('response_hook', None)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
result = self.client_connection.QueryUsers(
database_link=self.database_link,
query=query if parameters is None else {"query": query, "parameters": parameters},
options=feed_options,
**kwargs
)
if response_hook:
response_hook(self.client_connection.last_response_headers, result)
return result
[docs]
@distributed_trace_async
async def upsert_user(
self,
body: Dict[str, Any],
**kwargs: Any
) -> UserProxy:
"""Insert or update the specified user.
If the user already exists in the container, it is replaced. If the user
does not already exist, it is inserted.
:param Dict[str, Any] body: A dict-like object representing the user to update or insert.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given user could not be upserted.
:returns: A `UserProxy` instance representing the upserted user.
:rtype: ~azure.cosmos.aio.UserProxy
"""
request_options = _build_options(kwargs)
user = await self.client_connection.UpsertUser(
database_link=self.database_link, user=body, options=request_options, **kwargs
)
return UserProxy(
client_connection=self.client_connection, id=user["id"], database_link=self.database_link, properties=user
)
[docs]
@distributed_trace_async
async def replace_user(
self,
user: Union[str, UserProxy, Mapping[str, Any]],
body: Dict[str, Any],
**kwargs: Any
) -> UserProxy:
"""Replaces the specified user if it exists in the container.
:param user: The ID (name), dict representing the properties or :class:`UserProxy`
instance of the user to be replaced.
:type user: Union[str, Dict[str, Any], ~azure.cosmos.aio.UserProxy]
:param Dict[str, Any] body: A dict object representing the user to replace.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError:
If the replace operation failed or the user with given ID does not exist.
:returns: A `UserProxy` instance representing the user after replace went through.
:rtype: ~azure.cosmos.aio.UserProxy
"""
request_options = _build_options(kwargs)
replaced_user = await self.client_connection.ReplaceUser(
user_link=self._get_user_link(user), user=body, options=request_options, **kwargs)
return UserProxy(
client_connection=self.client_connection,
id=replaced_user["id"],
database_link=self.database_link,
properties=replaced_user
)
[docs]
@distributed_trace_async
async def delete_user(
self,
user: Union[str, UserProxy, Mapping[str, Any]],
**kwargs: Any
) -> None:
"""Delete the specified user from the container.
:param user: The ID (name), dict representing the properties or :class:`UserProxy`
instance of the user to be deleted.
:type user: Union[str, Dict[str, Any], ~azure.cosmos.aio.UserProxy]
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], None], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The user wasn't deleted successfully.
:raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The user does not exist in the container.
:rtype: None
"""
request_options = _build_options(kwargs)
await self.client_connection.DeleteUser(
user_link=self._get_user_link(user), options=request_options, **kwargs
)
[docs]
@distributed_trace_async
async def get_throughput(self, **kwargs: Any) -> ThroughputProperties:
"""Get the ThroughputProperties object for this database.
If no ThroughputProperties already exists for the database, an exception is raised.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], List[Dict[str, Any]]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: No throughput properties exist for the database
or the throughput properties could not be retrieved.
:returns: ThroughputProperties for the database.
:rtype: ~azure.cosmos.offer.ThroughputProperties
"""
response_hook = kwargs.pop('response_hook', None)
properties = await self._get_properties()
link = properties["_self"]
query_spec = {
"query": "SELECT * FROM root r WHERE r.resource=@link",
"parameters": [{"name": "@link", "value": link}],
}
throughput_properties = [throughput async for throughput in
self.client_connection.QueryOffers(query_spec, **kwargs)]
if len(throughput_properties) == 0:
raise CosmosResourceNotFoundError(
status_code=StatusCodes.NOT_FOUND,
message="Could not find ThroughputProperties for database " + self.database_link)
if response_hook:
response_hook(self.client_connection.last_response_headers, throughput_properties)
return _deserialize_throughput(throughput=throughput_properties)
[docs]
@distributed_trace_async
async def replace_throughput(
self,
throughput: Union[int, ThroughputProperties],
**kwargs: Any
) -> ThroughputProperties:
"""Replace the database-level throughput.
If no ThroughputProperties already exist for the database, an exception is raised.
:param throughput: The throughput to be set.
:type throughput: Union[int, ~azure.cosmos.ThroughputProperties]
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: No throughput properties exist for the database
or the throughput properties could not be updated.
:returns: ThroughputProperties for the database, updated with new throughput.
:rtype: ~azure.cosmos.offer.ThroughputProperties
"""
properties = await self._get_properties()
link = properties["_self"]
query_spec = {
"query": "SELECT * FROM root r WHERE r.resource=@link",
"parameters": [{"name": "@link", "value": link}],
}
throughput_properties = [throughput async for throughput in
self.client_connection.QueryOffers(query_spec, **kwargs)]
if len(throughput_properties) == 0:
raise CosmosResourceNotFoundError(
status_code=StatusCodes.NOT_FOUND,
message="Could not find Offer for database " + self.database_link)
new_offer = throughput_properties[0].copy()
_replace_throughput(throughput=throughput, new_throughput_properties=new_offer)
data = await self.client_connection.ReplaceOffer(offer_link=throughput_properties[0]["_self"],
offer=throughput_properties[0], **kwargs)
return ThroughputProperties(offer_throughput=data["content"]["offerThroughput"], properties=data)