Source code for azure.cosmos.aio._scripts

# The MIT License (MIT)
# Copyright (c) 2021 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Create, read, update and delete and execute scripts in the Azure Cosmos DB SQL API service.
"""
# pylint: disable=protected-access
# pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs

from typing import Any, Dict, List, Mapping, Union, Optional, Type, Sequence, TYPE_CHECKING

from azure.core.async_paging import AsyncItemPaged
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.tracing.decorator import distributed_trace

from ._cosmos_client_connection_async import CosmosClientConnection as _CosmosClientConnection
from .._base import build_options as _build_options
from ..scripts import ScriptType
from ..partition_key import NonePartitionKeyValue, _return_undefined_or_empty_partition_key

if TYPE_CHECKING:
    from ._container import ContainerProxy


PartitionKeyType = Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]]  # pylint: disable=line-too-long


[docs] class ScriptsProxy: """An interface to interact with stored procedures. This class should not be instantiated directly. Instead, use the :func:`ContainerProxy.scripts` attribute. """ def __init__( self, container: "ContainerProxy", client_connection: _CosmosClientConnection, container_link: str ) -> None: self.client_connection = client_connection self.container_link = container_link self.container_proxy = container def _get_resource_link(self, script_or_id: Union[Mapping[str, Any], str], typ: str) -> str: if isinstance(script_or_id, str): return "{}/{}/{}".format(self.container_link, typ, script_or_id) return script_or_id["_self"]
[docs] @distributed_trace def list_stored_procedures( self, *, max_item_count: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """List all stored procedures in the container. :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :returns: An AsyncItemPaged of stored procedures (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count return self.client_connection.ReadStoredProcedures( collection_link=self.container_link, options=feed_options, **kwargs )
[docs] @distributed_trace def query_stored_procedures( self, query: str, *, parameters: Optional[List[Dict[str, Any]]] = None, max_item_count: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Return all stored procedures matching the given `query`. :param str query: The Azure Cosmos DB SQL query to execute. :keyword parameters: Optional array of parameters to the query. Ignored if no query is provided. :paramtype parameters: List[Dict[str, Any]] :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :returns: An AsyncItemPaged of stored procedures (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count return self.client_connection.QueryStoredProcedures( collection_link=self.container_link, query=query if parameters is None else {"query": query, "parameters": parameters}, options=feed_options, **kwargs )
[docs] @distributed_trace_async async def get_stored_procedure(self, sproc: Union[str, Mapping[str, Any]], **kwargs: Any) -> Dict[str, Any]: """Get the stored procedure identified by `sproc`. :param sproc: The ID (name) or dict representing the stored procedure to retrieve. :type sproc: Union[str, Dict[str, Any]] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given stored procedure couldn't be retrieved. :returns: A dict representing the retrieved stored procedure. :rtype: Dict[str, Any] """ request_options = _build_options(kwargs) return await self.client_connection.ReadStoredProcedure( sproc_link=self._get_resource_link(sproc, ScriptType.StoredProcedure), options=request_options, **kwargs )
[docs] @distributed_trace_async async def create_stored_procedure(self, body: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]: """Create a new stored procedure in the container. To replace an existing stored procedure, use the :func:`Container.scripts.replace_stored_procedure` method. :param Dict[str, Any] body: A dict representing the stored procedure to create. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given stored procedure couldn't be created. :returns: A dict representing the new stored procedure. :rtype: Dict[str, Any] """ request_options = _build_options(kwargs) return await self.client_connection.CreateStoredProcedure( collection_link=self.container_link, sproc=body, options=request_options, **kwargs )
[docs] @distributed_trace_async async def replace_stored_procedure( self, sproc: Union[str, Mapping[str, Any]], body: Dict[str, Any], **kwargs: Any ) -> Dict[str, Any]: """Replace a specified stored procedure in the container. If the stored procedure does not already exist in the container, an exception is raised. :param sproc: The ID (name) or dict representing stored procedure to be replaced. :type sproc: Union[str, Dict[str, Any]] :param Dict[str, Any] body: A dict representing the stored procedure to replace. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the replace operation failed or the stored procedure with given id does not exist. :returns: A dict representing the stored procedure after replace went through. :rtype: Dict[str, Any] """ request_options = _build_options(kwargs) return await self.client_connection.ReplaceStoredProcedure( sproc_link=self._get_resource_link(sproc, ScriptType.StoredProcedure), sproc=body, options=request_options, **kwargs )
[docs] @distributed_trace_async async def delete_stored_procedure(self, sproc: Union[str, Mapping[str, Any]], **kwargs: Any) -> None: """Delete a specified stored procedure from the container. If the stored procedure does not already exist in the container, an exception is raised. :param sproc: The ID (name) or dict representing stored procedure to be deleted. :type sproc: Union[str, Dict[str, Any]] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The stored procedure wasn't deleted successfully. :raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The stored procedure does not exist in the container. :rtype: None """ request_options = _build_options(kwargs) await self.client_connection.DeleteStoredProcedure( sproc_link=self._get_resource_link(sproc, ScriptType.StoredProcedure), options=request_options, **kwargs )
[docs] @distributed_trace_async async def execute_stored_procedure( self, sproc: Union[str, Dict[str, Any]], *, partition_key: Optional[PartitionKeyType] = None, parameters: Optional[List[Dict[str, Any]]] = None, enable_script_logging: Optional[bool] = None, **kwargs: Any ) -> Dict[str, Any]: """Execute a specified stored procedure. If the stored procedure does not already exist in the container, an exception is raised. :param sproc: The ID (name) or dict representing the stored procedure to be executed. :type sproc: Union[str, Dict[str, Any]] :keyword partition_key: Specifies the partition key to indicate which partition the stored procedure should execute on. :paramtype partition_key: Union[str, bool, int, float, List[Union[str, bool, int, float]]] :keyword parameters: List of parameters to be passed to the stored procedure to be executed. :paramtype parameters: List[Dict[str, Any]] :keyword bool enable_script_logging: Enables or disables script logging for the current request. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the stored procedure execution failed or if the stored procedure with given id does not exists in the container. :returns: Result of the executed stored procedure for the given parameters. :rtype: Dict[str, Any] """ request_options = _build_options(kwargs) if partition_key is not None: request_options["partitionKey"] = ( _return_undefined_or_empty_partition_key( await self.container_proxy.is_system_key) if partition_key == NonePartitionKeyValue else partition_key ) if enable_script_logging is not None: request_options["enableScriptLogging"] = enable_script_logging return await self.client_connection.ExecuteStoredProcedure( sproc_link=self._get_resource_link(sproc, ScriptType.StoredProcedure), params=parameters, options=request_options, **kwargs )
[docs] @distributed_trace def list_triggers( self, *, max_item_count: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """List all triggers in the container. :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :returns: An AsyncItemPaged of triggers (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count return self.client_connection.ReadTriggers( collection_link=self.container_link, options=feed_options, **kwargs )
[docs] @distributed_trace def query_triggers( self, query: str, *, parameters: Optional[List[Dict[str, Any]]] = None, max_item_count: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Return all triggers matching the given `query`. :param str query: The Azure Cosmos DB SQL query to execute. :keyword parameters: Optional array of parameters to the query. Ignored if no query is provided. :paramtype parameters: List[Dict[str, Any]] :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :returns: An AsyncItemPaged of triggers (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count return self.client_connection.QueryTriggers( collection_link=self.container_link, query=query if parameters is None else {"query": query, "parameters": parameters}, options=feed_options, **kwargs )
[docs] @distributed_trace_async async def get_trigger(self, trigger: Union[str, Mapping[str, Any]], **kwargs: Any) -> Dict[str, Any]: """Get a trigger identified by `id`. :param trigger: The ID (name) or dict representing trigger to retrieve. :type trigger: Union[str, Dict[str, Any]] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given trigger couldn't be retrieved. :returns: A dict representing the retrieved trigger. :rtype: Dict[str, Any] """ request_options = _build_options(kwargs) return await self.client_connection.ReadTrigger( trigger_link=self._get_resource_link(trigger, ScriptType.Trigger), options=request_options, **kwargs )
[docs] @distributed_trace_async async def create_trigger(self, body: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]: """Create a trigger in the container. To replace an existing trigger, use the :func:`ContainerProxy.scripts.replace_trigger` method. :param Dict[str, Any] body: A dict-like object representing the trigger to create. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given trigger couldn't be created. :returns: A dict representing the new trigger. :rtype: Dict[str, Any] """ request_options = _build_options(kwargs) return await self.client_connection.CreateTrigger( collection_link=self.container_link, trigger=body, options=request_options, **kwargs )
[docs] @distributed_trace_async async def replace_trigger( self, trigger: Union[str, Mapping[str, Any]], body: Dict[str, Any], **kwargs: Any ) -> Dict[str, Any]: """Replace a specified trigger in the container. If the trigger does not already exist in the container, an exception is raised. :param trigger: The ID (name) or dict representing trigger to be replaced. :type trigger: Union[str, Dict[str, Any]] :param Dict[str, Any] body: A dict-like object representing the trigger to replace. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the replace operation failed or the trigger with given id does not exist. :returns: A dict representing the trigger after replace went through. :rtype: Dict[str, Any] """ request_options = _build_options(kwargs) return await self.client_connection.ReplaceTrigger( trigger_link=self._get_resource_link(trigger, ScriptType.Trigger), trigger=body, options=request_options, **kwargs )
[docs] @distributed_trace_async async def delete_trigger(self, trigger: Union[str, Mapping[str, Any]], **kwargs: Any) -> None: """Delete a specified trigger from the container. If the trigger does not already exist in the container, an exception is raised. :param trigger: The ID (name) or dict representing trigger to be deleted. :type trigger: Union[str, Dict[str, Any]] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The trigger wasn't deleted successfully. :raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The trigger does not exist in the container. :rtype: None """ request_options = _build_options(kwargs) await self.client_connection.DeleteTrigger( trigger_link=self._get_resource_link(trigger, ScriptType.Trigger), options=request_options, **kwargs )
[docs] @distributed_trace def list_user_defined_functions( self, *, max_item_count: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """List all the user-defined functions in the container. :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :returns: An AsyncItemPaged of user-defined functions (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count return self.client_connection.ReadUserDefinedFunctions( collection_link=self.container_link, options=feed_options, **kwargs )
[docs] @distributed_trace def query_user_defined_functions( self, query: str, *, parameters: Optional[List[Dict[str, Any]]] = None, max_item_count: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Return user-defined functions matching a given `query`. :param str query: The Azure Cosmos DB SQL query to execute. :keyword parameters: Optional array of parameters to the query. Ignored if no query is provided. :paramtype parameters: List[Dict[str, Any]] :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :returns: An AsyncItemPaged of user-defined functions (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count return self.client_connection.QueryUserDefinedFunctions( collection_link=self.container_link, query=query if parameters is None else {"query": query, "parameters": parameters}, options=feed_options, **kwargs )
[docs] @distributed_trace_async async def get_user_defined_function(self, udf: Union[str, Mapping[str, Any]], **kwargs: Any) -> Dict[str, Any]: """Get a user-defined function identified by `id`. :param udf: The ID (name) or dict representing udf to retrieve. :type udf: Union[str, Dict[str, Any]] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the user-defined function couldn't be retrieved. :returns: A dict representing the retrieved user-defined function. :rtype: Dict[str, Any] """ request_options = _build_options(kwargs) return await self.client_connection.ReadUserDefinedFunction( udf_link=self._get_resource_link(udf, ScriptType.UserDefinedFunction), options=request_options, **kwargs )
[docs] @distributed_trace_async async def create_user_defined_function(self, body: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]: """Create a user-defined function in the container. To replace an existing user-defined function, use the :func:`ContainerProxy.scripts.replace_user_defined_function` method. :param Dict[str, Any] body: A dict-like object representing the user-defined function to create. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the user-defined function couldn't be created. :returns: A dict representing the new user-defined function. :rtype: Dict[str, Any] """ request_options = _build_options(kwargs) return await self.client_connection.CreateUserDefinedFunction( collection_link=self.container_link, udf=body, options=request_options, **kwargs )
[docs] @distributed_trace_async async def replace_user_defined_function( self, udf: Union[str, Mapping[str, Any]], body: Dict[str, Any], **kwargs: Any ) -> Dict[str, Any]: """Replace a specified user-defined function in the container. If the user-defined function does not already exist in the container, an exception is raised. :param udf: The ID (name) or dict representing user-defined function to be replaced. :type udf: Union[str, Dict[str, Any]] :param Dict[str, Any] body: A dict-like object representing the udf to replace. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the replace operation failed or the user-defined function with the given id does not exist. :returns: A dict representing the user-defined function after replace went through. :rtype: Dict[str, Any] """ request_options = _build_options(kwargs) return await self.client_connection.ReplaceUserDefinedFunction( udf_link=self._get_resource_link(udf, ScriptType.UserDefinedFunction), udf=body, options=request_options, **kwargs )
[docs] @distributed_trace_async async def delete_user_defined_function(self, udf: Union[str, Mapping[str, Any]], **kwargs: Any) -> None: """Delete a specified user-defined function from the container. If the user-defined function does not already exist in the container, an exception is raised. :param udf: The ID (name) or dict representing udf to be deleted. :type udf: Union[str, Dict[str, Any]] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The udf wasn't deleted successfully. :raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The UDF does not exist in the container. :rtype: None """ request_options = _build_options(kwargs) await self.client_connection.DeleteUserDefinedFunction( udf_link=self._get_resource_link(udf, ScriptType.UserDefinedFunction), options=request_options, **kwargs )