Source code for azure.cosmos.aio.user

# The MIT License (MIT)
# Copyright (c) 2021 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs

"""Create, read, update and delete users in the Azure Cosmos DB SQL API service.
"""

from typing import Any, List, Dict, Union, cast, Optional
from azure.core.async_paging import AsyncItemPaged

from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.tracing.decorator import distributed_trace

from ._cosmos_client_connection_async import CosmosClientConnection
from .._base import build_options
from ..permission import Permission as _permission


[docs]class UserProxy(object): """An interface to interact with a specific user. This class should not be instantiated directly. Instead, use the :func:`DatabaseProxy.get_user_client` method. """ def __init__(self, client_connection, id, database_link, properties=None): # pylint: disable=redefined-builtin # type: (CosmosClientConnection, str, str, Dict[str, Any]) -> None self.client_connection = client_connection self.id = id self.user_link = u"{}/users/{}".format(database_link, id) self._properties = properties def __repr__(self): # type () -> str return "<UserProxy [{}]>".format(self.user_link)[:1024] def _get_permission_link(self, permission_or_id): # type: (Union[Permission, str, Dict[str, Any]]) -> str if isinstance(permission_or_id, str): return u"{}/permissions/{}".format(self.user_link, permission_or_id) try: return cast("Permission", permission_or_id).permission_link except AttributeError: pass return u"{}/permissions/{}".format(self.user_link, cast("Dict[str, str]", permission_or_id)["id"]) async def _get_properties(self): # type: () -> Dict[str, Any] if self._properties is None: self._properties = await self.read() return self._properties
[docs] @distributed_trace_async async def read(self, **kwargs): # type: (Any) -> Dict[str, Any] """Read user propertes. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: A dictionary of the retrieved user properties. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given user couldn't be retrieved. :rtype: dict[str, Any] """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) self._properties = await self.client_connection.ReadUser(user_link=self.user_link, options=request_options, **kwargs) if response_hook: response_hook(self.client_connection.last_response_headers, self._properties) return cast('Dict[str, Any]', self._properties)
[docs] @distributed_trace def list_permissions(self, max_item_count=None, **kwargs): # type: (Optional[int], Any) -> AsyncItemPaged[Dict[str, Any]] """List all permission for the user. :param max_item_count: Max number of permissions to be returned in the enumeration operation. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: An AsyncItemPaged of permissions (dicts). :rtype: AsyncItemPaged[dict[str, Any]] """ feed_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count result = self.client_connection.ReadPermissions(user_link=self.user_link, options=feed_options, **kwargs) if response_hook: response_hook(self.client_connection.last_response_headers, result) return result
[docs] @distributed_trace def query_permissions( self, query, parameters=None, max_item_count=None, **kwargs ): # type: (str, Optional[List[str]], Optional[int], Any) -> AsyncItemPaged[Dict[str, Any]] """Return all permissions matching the given `query`. :param query: The Azure Cosmos DB SQL query to execute. :param parameters: Optional array of parameters to the query. Ignored if no query is provided. :param max_item_count: Max number of permissions to be returned in the enumeration operation. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: An AsyncItemPaged of permissions (dicts). :rtype: AsyncItemPaged[dict[str, Any]] """ feed_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count result = self.client_connection.QueryPermissions( user_link=self.user_link, query=query if parameters is None else dict(query=query, parameters=parameters), options=feed_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, result) return result
[docs] @distributed_trace_async async def get_permission(self, permission, **kwargs): # type: (Union[str, Dict[str, Any], Permission], Any) -> Permission """Get the permission identified by `id`. :param permission: The ID (name), dict representing the properties or :class:`Permission` instance of the permission to be retrieved. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: A dict representing the retrieved permission. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given permission couldn't be retrieved. :rtype: dict[str, Any] """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) permission_resp = await self.client_connection.ReadPermission( permission_link=self._get_permission_link(permission), options=request_options, **kwargs ) # type: Dict[str, str] if response_hook: response_hook(self.client_connection.last_response_headers, permission_resp) return _permission( id=permission_resp["id"], user_link=self.user_link, permission_mode=permission_resp["permissionMode"], resource_link=permission_resp["resource"], properties=permission_resp, )
[docs] @distributed_trace_async async def create_permission(self, body, **kwargs): # type: (Dict[str, Any], Any) -> Permission """Create a permission for the user. To update or replace an existing permision, use the :func:`UserProxy.upsert_permission` method. :param body: A dict-like object representing the permission to create. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: A dict representing the new permission. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given permission couldn't be created. :rtype: dict[str, Any] """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) permission = await self.client_connection.CreatePermission( user_link=self.user_link, permission=body, options=request_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, permission) return _permission( id=permission["id"], user_link=self.user_link, permission_mode=permission["permissionMode"], resource_link=permission["resource"], properties=permission, )
[docs] @distributed_trace_async async def upsert_permission(self, body, **kwargs): # type: (Dict[str, Any], Any) -> Permission """Insert or update the specified permission. If the permission already exists in the container, it is replaced. If the permission does not exist, it is inserted. :param body: A dict-like object representing the permission to update or insert. :param Callable response_hook: A callable invoked with the response metadata. :returns: A dict representing the upserted permission. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given permission could not be upserted. :rtype: dict[str, Any] """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) permission = await self.client_connection.UpsertPermission( user_link=self.user_link, permission=body, options=request_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, permission) return _permission( id=permission["id"], user_link=self.user_link, permission_mode=permission["permissionMode"], resource_link=permission["resource"], properties=permission, )
[docs] @distributed_trace_async async def replace_permission(self, permission, body, **kwargs): # type: (str, Union[str, Dict[str, Any], Permission], Any) -> Permission """Replaces the specified permission if it exists for the user. If the permission does not already exist, an exception is raised. :param permission: The ID (name), dict representing the properties or :class:`Permission` instance of the permission to be replaced. :param body: A dict-like object representing the permission to replace. :keyword Callable response_hook: A callable invoked with the response metadata. :returns: A dict representing the permission after replace went through. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the replace failed or the permission with given id does not exist. :rtype: dict[str, Any] """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) permission_resp = await self.client_connection.ReplacePermission( permission_link=self._get_permission_link(permission), permission=body, options=request_options, **kwargs ) # type: Dict[str, str] if response_hook: response_hook(self.client_connection.last_response_headers, permission_resp) return _permission( id=permission_resp["id"], user_link=self.user_link, permission_mode=permission_resp["permissionMode"], resource_link=permission_resp["resource"], properties=permission_resp, )
[docs] @distributed_trace_async async def delete_permission(self, permission, **kwargs): # type: (Union[str, Dict[str, Any], Permission], Any) -> None """Delete the specified permission from the user. If the permission does not already exist, an exception is raised. :param permission: The ID (name), dict representing the properties or :class:`Permission` instance of the permission to be deleted. :keyword Callable response_hook: A callable invoked with the response metadata. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The permission wasn't deleted successfully. :raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The permission does not exist for the user. :rtype: None """ request_options = build_options(kwargs) response_hook = kwargs.pop('response_hook', None) result = await self.client_connection.DeletePermission( permission_link=self._get_permission_link(permission), options=request_options, **kwargs ) if response_hook: response_hook(self.client_connection.last_response_headers, result)