Source code for azure.eventhub.exceptions

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import logging
import six

from uamqp import errors, compat  # type: ignore

from ._constants import NO_RETRY_ERRORS

_LOGGER = logging.getLogger(__name__)


def _error_handler(error):
    """
    Called internally when an event has failed to send so we
    can parse the error to determine whether we should attempt
    to retry sending the event again.
    Returns the action to take according to error type.

    :param error: The error received in the send attempt.
    :type error: Exception
    :rtype: ~uamqp.errors.ErrorAction
    """
    if error.condition == b'com.microsoft:server-busy':
        return errors.ErrorAction(retry=True, backoff=4)
    if error.condition == b'com.microsoft:timeout':
        return errors.ErrorAction(retry=True, backoff=2)
    if error.condition == b'com.microsoft:operation-cancelled':
        return errors.ErrorAction(retry=True)
    if error.condition == b"com.microsoft:container-close":
        return errors.ErrorAction(retry=True, backoff=4)
    if error.condition in NO_RETRY_ERRORS:
        return errors.ErrorAction(retry=False)
    return errors.ErrorAction(retry=True)


[docs]class EventHubError(Exception): """Represents an error occurred in the client. :ivar message: The error message. :vartype message: str :ivar error: The error condition, if available. :vartype error: str :ivar details: The error details, if included in the service response. :vartype details: dict[str, str] """ def __init__(self, message, details=None): self.error = None self.message = message self.details = details if details and isinstance(details, Exception): try: condition = details.condition.value.decode('UTF-8') except AttributeError: try: condition = details.condition.decode('UTF-8') except AttributeError: condition = None if condition: _, _, self.error = condition.partition(':') self.message += "\nError: {}".format(self.error) try: self._parse_error(details.description) for detail in self.details: self.message += "\n{}".format(detail) except: # pylint: disable=bare-except self.message += "\n{}".format(details) super(EventHubError, self).__init__(self.message) def _parse_error(self, error_list): details = [] self.message = error_list if isinstance(error_list, six.text_type) else error_list.decode('UTF-8') details_index = self.message.find(" Reference:") if details_index >= 0: details_msg = self.message[details_index + 1:] self.message = self.message[0:details_index] tracking_index = details_msg.index(", TrackingId:") system_index = details_msg.index(", SystemTracker:") timestamp_index = details_msg.index(", Timestamp:") details.append(details_msg[:tracking_index]) details.append(details_msg[tracking_index + 2: system_index]) details.append(details_msg[system_index + 2: timestamp_index]) details.append(details_msg[timestamp_index + 2:]) self.details = details
[docs]class ClientClosedError(EventHubError): """The Client has been closed and is unable to process further events."""
[docs]class ConnectionLostError(EventHubError): """Connection to the Event Hub is lost. In most cases the client will automatically retry on this error."""
[docs]class ConnectError(EventHubError): """Failed to connect to the Event Hubs service."""
[docs]class AuthenticationError(ConnectError): """Failed to connect to the Event Hubs service because of an authentication issue."""
[docs]class EventDataError(EventHubError): """Client prevented problematic event data from being sent."""
[docs]class EventDataSendError(EventHubError): """Service returned an error while an event data is being sent."""
[docs]class OperationTimeoutError(EventHubError): """Operation timed out."""
[docs]class OwnershipLostError(Exception): """Raised when `update_checkpoint` detects the ownership to a partition has been lost."""
def _create_eventhub_exception(exception): if isinstance(exception, errors.AuthenticationException): error = AuthenticationError(str(exception), exception) elif isinstance(exception, errors.VendorLinkDetach): error = ConnectError(str(exception), exception) elif isinstance(exception, errors.LinkDetach): error = ConnectionLostError(str(exception), exception) elif isinstance(exception, errors.ConnectionClose): error = ConnectionLostError(str(exception), exception) elif isinstance(exception, errors.MessageHandlerError): error = ConnectionLostError(str(exception), exception) elif isinstance(exception, errors.AMQPConnectionError): error_type = AuthenticationError if str(exception).startswith("Unable to open authentication session") \ else ConnectError error = error_type(str(exception), exception) elif isinstance(exception, compat.TimeoutException): error = ConnectionLostError(str(exception), exception) else: error = EventHubError(str(exception), exception) return error def _handle_exception(exception, closable): # pylint:disable=too-many-branches, too-many-statements try: # closable is a producer/consumer object name = closable._name # pylint: disable=protected-access except AttributeError: # closable is an client object name = closable._container_id # pylint: disable=protected-access if isinstance(exception, KeyboardInterrupt): # pylint:disable=no-else-raise _LOGGER.info("%r stops due to keyboard interrupt", name) closable.close() raise exception elif isinstance(exception, EventHubError): closable.close() raise exception elif isinstance(exception, ( errors.MessageAccepted, errors.MessageAlreadySettled, errors.MessageModified, errors.MessageRejected, errors.MessageReleased, errors.MessageContentTooLarge) ): _LOGGER.info("%r Event data error (%r)", name, exception) error = EventDataError(str(exception), exception) raise error elif isinstance(exception, errors.MessageException): _LOGGER.info("%r Event data send error (%r)", name, exception) error = EventDataSendError(str(exception), exception) raise error else: if isinstance(exception, errors.AuthenticationException): if hasattr(closable, "_close_connection"): closable._close_connection() # pylint:disable=protected-access elif isinstance(exception, errors.LinkDetach): if hasattr(closable, "_close_handler"): closable._close_handler() # pylint:disable=protected-access elif isinstance(exception, errors.ConnectionClose): if hasattr(closable, "_close_connection"): closable._close_connection() # pylint:disable=protected-access elif isinstance(exception, errors.MessageHandlerError): if hasattr(closable, "_close_handler"): closable._close_handler() # pylint:disable=protected-access elif isinstance(exception, errors.AMQPConnectionError): if hasattr(closable, "_close_connection"): closable._close_connection() # pylint:disable=protected-access elif isinstance(exception, compat.TimeoutException): pass # Timeout doesn't need to recreate link or connection to retry else: if hasattr(closable, "_close_connection"): closable._close_connection() # pylint:disable=protected-access return _create_eventhub_exception(exception)