Source code for azure.cosmos._retry_utility

# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Internal methods for executing functions in the Azure Cosmos database service.
"""
import json
import time
from typing import Optional

from azure.core.exceptions import AzureError, ClientAuthenticationError, ServiceRequestError
from azure.core.pipeline import PipelineRequest
from azure.core.pipeline.policies import RetryPolicy
from azure.core.pipeline.transport._base import HttpRequest

from . import exceptions
from . import _endpoint_discovery_retry_policy
from . import _resource_throttle_retry_policy
from . import _default_retry_policy
from . import _session_retry_policy
from . import _gone_retry_policy
from . import _timeout_failover_retry_policy
from . import _container_recreate_retry_policy
from .http_constants import HttpHeaders, StatusCodes, SubStatusCodes


# pylint: disable=protected-access, disable=too-many-lines, disable=too-many-statements, disable=too-many-branches


def Execute(client, global_endpoint_manager, function, *args, **kwargs):
    """Executes the function with passed parameters applying all retry policies

    :param object client:
        Document client instance
    :param object global_endpoint_manager:
        Instance of _GlobalEndpointManager class
    :param function function:
        Function to be called wrapped with retries
    :param list args:
    :returns: the result of running the passed in function as a (result, headers) tuple
    :rtype: tuple of (dict, dict)
    """
    # instantiate all retry policies here to be applied for each request execution
    endpointDiscovery_retry_policy = _endpoint_discovery_retry_policy.EndpointDiscoveryRetryPolicy(
        client.connection_policy, global_endpoint_manager, *args
    )

    resourceThrottle_retry_policy = _resource_throttle_retry_policy.ResourceThrottleRetryPolicy(
        client.connection_policy.RetryOptions.MaxRetryAttemptCount,
        client.connection_policy.RetryOptions.FixedRetryIntervalInMilliseconds,
        client.connection_policy.RetryOptions.MaxWaitTimeInSeconds,
    )
    defaultRetry_policy = _default_retry_policy.DefaultRetryPolicy(*args)

    sessionRetry_policy = _session_retry_policy._SessionRetryPolicy(
        client.connection_policy.EnableEndpointDiscovery, global_endpoint_manager, *args
    )

    partition_key_range_gone_retry_policy = _gone_retry_policy.PartitionKeyRangeGoneRetryPolicy(client, *args)

    timeout_failover_retry_policy = _timeout_failover_retry_policy._TimeoutFailoverRetryPolicy(
        client.connection_policy, global_endpoint_manager, *args
    )
    # HttpRequest we would need to modify for Container Recreate Retry Policy
    request: Optional[HttpRequest] = None
    if args and len(args) > 3:
        # Reference HttpRequest instance in args
        request = args[3]
        container_recreate_retry_policy = _container_recreate_retry_policy.ContainerRecreateRetryPolicy(
            client, client._container_properties_cache, request, *args)
    else:
        container_recreate_retry_policy = _container_recreate_retry_policy.ContainerRecreateRetryPolicy(
            client, client._container_properties_cache, None, *args)

    while True:
        client_timeout = kwargs.get('timeout')
        start_time = time.time()
        try:
            if args:
                result = ExecuteFunction(function, global_endpoint_manager, *args, **kwargs)
            else:
                result = ExecuteFunction(function, *args, **kwargs)
            if not client.last_response_headers:
                client.last_response_headers = {}

            # setting the throttle related response headers before returning the result
            client.last_response_headers[
                HttpHeaders.ThrottleRetryCount
            ] = resourceThrottle_retry_policy.current_retry_attempt_count
            client.last_response_headers[
                HttpHeaders.ThrottleRetryWaitTimeInMs
            ] = resourceThrottle_retry_policy.cumulative_wait_time_in_milliseconds
            # TODO: It is better to raise Exceptions manually in the method related to the request,
            #  a rework of retry would be needed to be able to retry exceptions raised that way.
            #  for now raising a manual exception here should allow it to be retried.
            # If container does not have throughput, results will return empty list.
            # We manually raise a 404. We raise it here, so we can handle it in retry utilities.
            if result and isinstance(result[0], dict) and 'Offers' in result[0] and \
                    not result[0]['Offers'] and request.method == 'POST':
                # Grab the link used for getting throughput properties to add to message.
                link = json.loads(request.body)["parameters"][0]["value"]
                raise exceptions.CosmosResourceNotFoundError(
                    status_code=StatusCodes.NOT_FOUND,
                    message="Could not find ThroughputProperties for container " + link,
                    sub_status_code=SubStatusCodes.THROUGHPUT_OFFER_NOT_FOUND)
            return result
        except exceptions.CosmosHttpResponseError as e:
            retry_policy = defaultRetry_policy
            # Re-assign retry policy based on error code
            if e.status_code == StatusCodes.FORBIDDEN and e.sub_status in\
                    [SubStatusCodes.DATABASE_ACCOUNT_NOT_FOUND, SubStatusCodes.WRITE_FORBIDDEN]:
                retry_policy = endpointDiscovery_retry_policy
            elif e.status_code == StatusCodes.TOO_MANY_REQUESTS:
                retry_policy = resourceThrottle_retry_policy
            elif (
                e.status_code == StatusCodes.NOT_FOUND
                and e.sub_status
                and e.sub_status == SubStatusCodes.READ_SESSION_NOTAVAILABLE
            ):
                retry_policy = sessionRetry_policy
            elif exceptions._partition_range_is_gone(e):
                retry_policy = partition_key_range_gone_retry_policy
            elif exceptions._container_recreate_exception(e):
                retry_policy = container_recreate_retry_policy
                # Before we retry if retry policy is container recreate, we need refresh the cache of the
                # container properties and pass in the new RID in the headers.
                client._refresh_container_properties_cache(retry_policy.container_link)
                if e.sub_status != SubStatusCodes.COLLECTION_RID_MISMATCH and retry_policy.check_if_rid_different(
                        retry_policy.container_link, client._container_properties_cache, retry_policy.container_rid):
                    retry_policy.refresh_container_properties_cache = False
                else:
                    cached_container = client._container_properties_cache[retry_policy.container_link]
                    # If partition key value was previously extracted from the document definition
                    # reattempt to extract partition key with updated partition key definition
                    if retry_policy.should_extract_partition_key(cached_container):
                        new_partition_key = retry_policy._extract_partition_key(
                            client, container_cache=cached_container, body=request.body
                        )
                        request.headers[HttpHeaders.PartitionKey] = new_partition_key
                    # If getting throughput, we have to replace the container link received from stale cache
                    # with refreshed cache
                    if retry_policy.should_update_throughput_link(request.body, cached_container):
                        new_body = retry_policy._update_throughput_link(request.body)
                        request.body = new_body

                    retry_policy.container_rid = cached_container["_rid"]
                    request.headers[retry_policy._intended_headers] = retry_policy.container_rid
            elif e.status_code in [StatusCodes.REQUEST_TIMEOUT, StatusCodes.SERVICE_UNAVAILABLE]:
                retry_policy = timeout_failover_retry_policy

            # If none of the retry policies applies or there is no retry needed, set the
            # throttle related response headers and re-throw the exception back arg[0]
            # is the request. It needs to be modified for write forbidden exception
            if not retry_policy.ShouldRetry(e):
                if not client.last_response_headers:
                    client.last_response_headers = {}
                client.last_response_headers[
                    HttpHeaders.ThrottleRetryCount
                ] = resourceThrottle_retry_policy.current_retry_attempt_count
                client.last_response_headers[
                    HttpHeaders.ThrottleRetryWaitTimeInMs
                ] = resourceThrottle_retry_policy.cumulative_wait_time_in_milliseconds
                if args and args[0].should_clear_session_token_on_session_read_failure:
                    client.session.clear_session_token(client.last_response_headers)
                raise

            # Wait for retry_after_in_milliseconds time before the next retry
            time.sleep(retry_policy.retry_after_in_milliseconds / 1000.0)
            if client_timeout:
                kwargs['timeout'] = client_timeout - (time.time() - start_time)
                if kwargs['timeout'] <= 0:
                    raise exceptions.CosmosClientTimeoutError()


def ExecuteFunction(function, *args, **kwargs):
    """Stub method so that it can be used for mocking purposes as well.
    :param Callable function: the function to execute.
    :param list args: the explicit arguments for the function.
    :returns: the result of executing the function with the passed in arguments
    :rtype: tuple(dict, dict)
    """
    return function(*args, **kwargs)


def _configure_timeout(request: PipelineRequest, absolute: Optional[int], per_request: int) -> None:
    if absolute is not None:
        if absolute <= 0:
            raise exceptions.CosmosClientTimeoutError()
        if per_request:
            # Both socket timeout and client timeout have been provided - use the shortest value.
            request.context.options['connection_timeout'] = min(per_request, absolute)
        else:
            # Only client timeout provided.
            request.context.options['connection_timeout'] = absolute
    elif per_request:
        # Only socket timeout provided.
        request.context.options['connection_timeout'] = per_request


[docs] class ConnectionRetryPolicy(RetryPolicy): def __init__(self, **kwargs): clean_kwargs = {k: v for k, v in kwargs.items() if v is not None} super(ConnectionRetryPolicy, self).__init__(**clean_kwargs)
[docs] def send(self, request): """Sends the PipelineRequest object to the next policy. Uses retry settings if necessary. Also enforces an absolute client-side timeout that spans multiple retry attempts. :param request: The PipelineRequest object :type request: ~azure.core.pipeline.PipelineRequest :return: Returns the PipelineResponse or raises error if maximum retries exceeded. :rtype: ~azure.core.pipeline.PipelineResponse :raises ~azure.core.exceptions.AzureError: Maximum retries exceeded. :raises ~azure.cosmos.exceptions.CosmosClientTimeoutError: Specified timeout exceeded. :raises ~azure.core.exceptions.ClientAuthenticationError: Authentication failed. """ absolute_timeout = request.context.options.pop('timeout', None) per_request_timeout = request.context.options.pop('connection_timeout', 0) retry_error = None retry_active = True response = None retry_settings = self.configure_retries(request.context.options) while retry_active: start_time = time.time() try: _configure_timeout(request, absolute_timeout, per_request_timeout) response = self.next.send(request) if self.is_retry(retry_settings, response): retry_active = self.increment(retry_settings, response=response) if retry_active: self.sleep(retry_settings, request.context.transport, response=response) continue break except ClientAuthenticationError: # pylint:disable=try-except-raise # the authentication policy failed such that the client's request can't # succeed--we'll never have a response to it, so propagate the exception raise except exceptions.CosmosClientTimeoutError as timeout_error: timeout_error.inner_exception = retry_error timeout_error.response = response timeout_error.history = retry_settings['history'] raise except ServiceRequestError as err: # the request ran into a socket timeout or failed to establish a new connection # since request wasn't sent, we retry up to however many connection retries are configured (default 3) if retry_settings['connect'] > 0: retry_active = self.increment(retry_settings, response=request, error=err) if retry_active: self.sleep(retry_settings, request.context.transport) continue raise err except AzureError as err: retry_error = err if self._is_method_retryable(retry_settings, request.http_request): retry_active = self.increment(retry_settings, response=request, error=err) if retry_active: self.sleep(retry_settings, request.context.transport) continue raise err finally: end_time = time.time() if absolute_timeout: absolute_timeout -= (end_time - start_time) self.update_context(response.context, retry_settings) return response