# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
from typing import Optional
from uamqp import errors, constants
from azure.core.exceptions import AzureError
from ._common.constants import SESSION_LOCK_LOST, SESSION_LOCK_TIMEOUT
_NO_RETRY_ERRORS = (
constants.ErrorCodes.DecodeError,
constants.ErrorCodes.LinkMessageSizeExceeded,
constants.ErrorCodes.NotFound,
constants.ErrorCodes.NotImplemented,
constants.ErrorCodes.LinkRedirect,
constants.ErrorCodes.NotAllowed,
constants.ErrorCodes.UnauthorizedAccess,
constants.ErrorCodes.LinkStolen,
constants.ErrorCodes.ResourceLimitExceeded,
constants.ErrorCodes.ConnectionRedirect,
constants.ErrorCodes.PreconditionFailed,
constants.ErrorCodes.InvalidField,
constants.ErrorCodes.ResourceDeleted,
constants.ErrorCodes.IllegalState,
constants.ErrorCodes.FrameSizeTooSmall,
constants.ErrorCodes.ConnectionFramingError,
constants.ErrorCodes.SessionUnattachedHandle,
constants.ErrorCodes.SessionHandleInUse,
constants.ErrorCodes.SessionErrantLink,
constants.ErrorCodes.SessionWindowViolation,
b"com.microsoft:argument-out-of-range",
b"com.microsoft:entity-disabled",
b"com.microsoft:auth-failed",
b"com.microsoft:precondition-failed",
b"com.microsoft:argument-error")
_AMQP_SESSION_ERROR_CONDITIONS = (
SESSION_LOCK_LOST,
SESSION_LOCK_TIMEOUT
)
_AMQP_CONNECTION_ERRORS = (
errors.LinkDetach,
errors.ConnectionClose,
errors.MessageHandlerError,
errors.AMQPConnectionError
)
_AMQP_MESSAGE_ERRORS = (
errors.MessageAlreadySettled,
errors.MessageContentTooLarge,
errors.MessageException
)
def _error_handler(error):
"""Handle connection and service errors.
Called internally when an event has failed to send so we
can parse the error to determine whether we should attempt
to retry sending the event again.
Returns the action to take according to error type.
:param error: The error received in the send attempt.
:type error: Exception
:rtype: ~uamqp.errors.ErrorAction
"""
if error.condition == b'com.microsoft:server-busy':
return errors.ErrorAction(retry=True, backoff=4)
if error.condition == b'com.microsoft:timeout':
return errors.ErrorAction(retry=True, backoff=2)
if error.condition == b'com.microsoft:operation-cancelled':
return errors.ErrorAction(retry=True)
if error.condition == b"com.microsoft:container-close":
return errors.ErrorAction(retry=True, backoff=4)
if error.condition in _NO_RETRY_ERRORS:
return errors.ErrorAction(retry=False)
return errors.ErrorAction(retry=True)
def _handle_amqp_connection_error(logger, exception, handler):
# Handle all exception inherited from uamqp.errors.AMQPConnectionError
error_need_close_handler = True
error_need_raise = False
error = None
if isinstance(exception, errors.LinkDetach) and exception.condition in _AMQP_SESSION_ERROR_CONDITIONS:
# In session lock lost or no active session case, we don't retry, close the handler and raise the error
error_need_raise = True
if exception.condition == SESSION_LOCK_LOST:
try:
session_id = handler._session_id # pylint: disable=protected-access
except AttributeError:
session_id = None
error = SessionLockExpired("Connection detached - lock on Session {} lost.".format(session_id))
elif exception.condition == SESSION_LOCK_TIMEOUT:
error = NoActiveSession("Queue has no active session to receive from.")
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
# In other link detach and connection case, should retry
logger.info("Handler detached due to exception: (%r).", exception)
if exception.condition == constants.ErrorCodes.UnauthorizedAccess:
error = ServiceBusAuthorizationError(str(exception), error=exception)
elif exception.condition == constants.ErrorCodes.NotAllowed and 'requires sessions' in str(exception):
message = str(exception) + '\n\nsession_id must be set when getting a receiver for sessionful entity.'
error = ServiceBusConnectionError(message, error=exception)
else:
error = ServiceBusConnectionError(str(exception), error=exception)
elif isinstance(exception, errors.MessageHandlerError):
logger.info("Handler error: (%r).", exception)
error = ServiceBusConnectionError(str(exception), error=exception)
else:
# handling general uamqp.errors.AMQPConnectionError
logger.info("Failed to open handler: (%r).", exception)
message = "Failed to open handler: {}.".format(exception)
error = ServiceBusConnectionError(message, error=exception)
error_need_raise, error_need_close_handler = True, False
return error, error_need_close_handler, error_need_raise
def _handle_amqp_message_error(logger, exception, **kwargs):
# Handle amqp message related errors
error_need_close_handler = True
error_need_raise = False
error = None
if isinstance(exception, errors.MessageAlreadySettled):
# This one doesn't need retry, should raise the error
logger.info("Message already settled (%r)", exception)
error = MessageAlreadySettled(kwargs.get("settle_operation", "Unknown operation"))
error_need_close_handler = False
error_need_raise = True
elif isinstance(exception, errors.MessageContentTooLarge) or \
(isinstance(exception, errors.MessageException) and
exception.condition == constants.ErrorCodes.LinkMessageSizeExceeded):
# This one doesn't need retry, should raise the error
logger.info("Message content is too large (%r).", exception)
error = MessageContentTooLarge("Message content is too large.", error=exception)
error_need_close_handler = False
error_need_raise = True
else:
# handling general uamqp.errors.MessageException
logger.info("Message send failed (%r)", exception)
if exception.condition == constants.ErrorCodes.ClientError and 'timed out' in str(exception):
error = OperationTimeoutError("Send operation timed out", error=exception)
else:
error = MessageSendFailed(error=exception)
error_need_raise = False
return error, error_need_close_handler, error_need_raise
def _create_servicebus_exception(logger, exception, handler, **kwargs): # pylint: disable=too-many-statements
# transform amqp exceptions into servicebus exceptions
error_need_close_handler = True
error_need_raise = False
if isinstance(exception, _AMQP_CONNECTION_ERRORS):
error, error_need_close_handler, error_need_raise = \
_handle_amqp_connection_error(logger, exception, handler)
elif isinstance(exception, _AMQP_MESSAGE_ERRORS):
error, error_need_close_handler, error_need_raise = \
_handle_amqp_message_error(logger, exception, **kwargs)
elif isinstance(exception, errors.AuthenticationException):
logger.info("Authentication failed due to exception: (%r).", exception)
error = ServiceBusAuthenticationError(str(exception), error=exception)
else:
logger.info("Unexpected error occurred (%r). Shutting down.", exception)
if kwargs.get("settle_operation"):
error = MessageSettleFailed(kwargs.get("settle_operation"), error=exception)
elif not isinstance(exception, ServiceBusError):
error = ServiceBusError("Handler failed: {}.".format(exception), error=exception)
else:
error = exception
try:
err_condition = exception.condition
if err_condition in _NO_RETRY_ERRORS:
error_need_raise = True
except AttributeError:
pass
return error, error_need_close_handler, error_need_raise
class _ServiceBusErrorPolicy(errors.ErrorPolicy):
def __init__(self, max_retries=3, is_session=False):
self._is_session = is_session
super(_ServiceBusErrorPolicy, self).__init__(max_retries=max_retries, on_error=_error_handler)
def on_unrecognized_error(self, error):
if self._is_session:
return errors.ErrorAction(retry=False)
return super(_ServiceBusErrorPolicy, self).on_unrecognized_error(error)
def on_link_error(self, error):
if self._is_session:
return errors.ErrorAction(retry=False)
return super(_ServiceBusErrorPolicy, self).on_link_error(error)
def on_connection_error(self, error):
if self._is_session:
return errors.ErrorAction(retry=False)
return super(_ServiceBusErrorPolicy, self).on_connection_error(error)
[docs]class ServiceBusError(AzureError):
"""Base exception for all Service Bus errors which can be used for default error handling.
:param str message: The message object stringified as 'message' attribute
:keyword error: The original exception if any
:paramtype error: Exception
:ivar exc_type: The exc_type from sys.exc_info()
:ivar exc_value: The exc_value from sys.exc_info()
:ivar exc_traceback: The exc_traceback from sys.exc_info()
:ivar exc_msg: A string formatting of message parameter, exc_type and exc_value
:ivar str message: A stringified version of the message parameter
"""
[docs]class ServiceBusConnectionError(ServiceBusError):
"""An error occurred in the connection."""
[docs]class ServiceBusAuthorizationError(ServiceBusError):
"""An error occurred when authorizing the connection."""
[docs]class ServiceBusAuthenticationError(ServiceBusError):
"""An error occurred when authenticate the connection."""
[docs]class NoActiveSession(ServiceBusError):
"""No active Sessions are available to receive from."""
[docs]class OperationTimeoutError(ServiceBusError):
"""Operation timed out."""
[docs]class ServiceBusMessageError(ServiceBusError):
"""An error occurred when an operation on a message failed because the message is in an incorrect state."""
[docs]class MessageContentTooLarge(ServiceBusMessageError, ValueError):
"""Message content is larger than the service bus frame size."""
[docs]class MessageAlreadySettled(ServiceBusMessageError):
"""Failed to settle the message.
An attempt was made to complete an operation on a message that has already
been settled (completed, abandoned, dead-lettered or deferred).
This error will also be raised if an attempt is made to settle a message
received via ReceiveAndDelete mode.
:param str action: The settlement operation, there are four types of settlement,
`complete/abandon/defer/dead_letter`.
"""
def __init__(self, action):
# type: (str) -> None
message = "Unable to {} message as it has already been settled".format(action)
super(MessageAlreadySettled, self).__init__(message)
[docs]class MessageSettleFailed(ServiceBusMessageError):
"""Attempt to settle a message failed.
:param str action: The settlement operation, there are four types of settlement,
`complete/abandon/defer/dead_letter`.
:param error: The original exception if any.
:type error: Exception
"""
def __init__(self, action, error):
# type: (str, Exception) -> None
message = "Failed to {} message. Error: {}".format(action, error)
super(MessageSettleFailed, self).__init__(message, error=error)
[docs]class MessageSendFailed(ServiceBusMessageError):
"""A message failed to send to the Service Bus entity."""
def __init__(self, error):
# type: (Exception) -> None
message = "Message failed to send. Error: {}".format(error)
self.condition = None
self.description = None
if hasattr(error, 'condition'):
self.condition = error.condition # type: ignore
self.description = error.description # type: ignore
super(MessageSendFailed, self).__init__(message, error=error)
[docs]class MessageLockExpired(ServiceBusMessageError):
"""The lock on the message has expired and it has been released back to the queue.
It will need to be received again in order to settle it.
"""
def __init__(self, message=None, error=None):
# type: (Optional[str], Optional[Exception]) -> None
message = message or "Message lock expired"
super(MessageLockExpired, self).__init__(message, error=error)
[docs]class SessionLockExpired(ServiceBusError):
"""The lock on the session has expired.
All unsettled messages that have been received can no longer be settled.
"""
def __init__(self, message=None, error=None):
# type: (Optional[str], Optional[Exception]) -> None
message = message or "Session lock expired"
super(SessionLockExpired, self).__init__(message, error=error)
[docs]class AutoLockRenewFailed(ServiceBusError):
"""An attempt to renew a lock on a message or session in the background has failed."""
[docs]class AutoLockRenewTimeout(ServiceBusError):
"""The time allocated to renew the message or session lock has elapsed."""