# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import logging
import six
from ._constants import NO_RETRY_ERRORS
from ._pyamqp import error as errors
_LOGGER = logging.getLogger(__name__)
[docs]class EventHubError(Exception):
"""Represents an error occurred in the client.
:ivar message: The error message.
:vartype message: str
:ivar error: The error condition, if available.
:vartype error: str
:ivar details: The error details, if included in the
service response.
:vartype details: dict[str, str]
"""
def __init__(self, message, details=None):
self.error = None
self.message = message
self.details = details
if details and isinstance(details, Exception):
try:
condition = details.condition.value.decode("UTF-8")
except AttributeError:
try:
condition = details.condition.decode("UTF-8")
except AttributeError:
condition = None
if condition:
_, _, self.error = condition.partition(":")
self.message += "\nError: {}".format(self.error)
try:
self._parse_error(details.description)
for detail in self.details:
self.message += "\n{}".format(detail)
except: # pylint: disable=bare-except
self.message += "\n{}".format(details)
super(EventHubError, self).__init__(self.message)
def _parse_error(self, error_list):
details = []
self.message = (
error_list
if isinstance(error_list, six.text_type)
else error_list.decode("UTF-8")
)
details_index = self.message.find(" Reference:")
if details_index >= 0:
details_msg = self.message[details_index + 1 :]
self.message = self.message[0:details_index]
tracking_index = details_msg.index(", TrackingId:")
system_index = details_msg.index(", SystemTracker:")
timestamp_index = details_msg.index(", Timestamp:")
details.append(details_msg[:tracking_index])
details.append(details_msg[tracking_index + 2 : system_index])
details.append(details_msg[system_index + 2 : timestamp_index])
details.append(details_msg[timestamp_index + 2 :])
self.details = details
[docs]class ClientClosedError(EventHubError):
"""The Client has been closed and is unable to process further events."""
[docs]class ConnectionLostError(EventHubError):
"""Connection to the Event Hub is lost.
In most cases the client will automatically retry on this error."""
[docs]class ConnectError(EventHubError):
"""Failed to connect to the Event Hubs service."""
[docs]class AuthenticationError(ConnectError):
"""Failed to connect to the Event Hubs service because of an authentication issue."""
[docs]class EventDataError(EventHubError):
"""Client prevented problematic event data from being sent."""
[docs]class EventDataSendError(EventHubError):
"""Service returned an error while an event data is being sent."""
[docs]class OperationTimeoutError(EventHubError):
"""Operation timed out."""
[docs]class OwnershipLostError(Exception):
"""Raised when `update_checkpoint` detects the ownership to a partition has been lost."""
def _create_eventhub_exception(exception):
if isinstance(exception, errors.AuthenticationException):
error = AuthenticationError(str(exception), exception)
elif isinstance(exception, errors.AMQPLinkError):
error = ConnectError(str(exception), exception)
# TODO: do we need MessageHanlderError in amqp any more
# if connection/session/link error are enough?
# elif isinstance(exception, errors.MessageHandlerError):
# error = ConnectionLostError(str(exception), exception)
elif isinstance(exception, errors.AMQPConnectionError):
error = ConnectError(str(exception), exception)
elif isinstance(exception, TimeoutError):
error = ConnectionLostError(str(exception), exception)
else:
error = EventHubError(str(exception), exception)
return error
def _handle_exception(
exception, closable
): # pylint:disable=too-many-branches, too-many-statements
try: # closable is a producer/consumer object
name = closable._name # pylint: disable=protected-access
except AttributeError: # closable is an client object
name = closable._container_id # pylint: disable=protected-access
if isinstance(exception, KeyboardInterrupt): # pylint:disable=no-else-raise
_LOGGER.info("%r stops due to keyboard interrupt", name)
closable._close_connection() # pylint:disable=protected-access
raise exception
elif isinstance(exception, EventHubError):
closable._close_handler() # pylint:disable=protected-access
raise exception
# TODO: The following errors seem to be useless in EH
# elif isinstance(
# exception,
# (
# errors.MessageAccepted,
# errors.MessageAlreadySettled,
# errors.MessageModified,
# errors.MessageRejected,
# errors.MessageReleased,
# errors.MessageContentTooLarge,
# ),
# ):
# _LOGGER.info("%r Event data error (%r)", name, exception)
# error = EventDataError(str(exception), exception)
# raise error
elif isinstance(exception, errors.MessageException):
_LOGGER.info("%r Event data send error (%r)", name, exception)
error = EventDataSendError(str(exception), exception)
raise error
else:
if isinstance(exception, errors.AuthenticationException):
if hasattr(closable, "_close_connection"):
closable._close_connection() # pylint:disable=protected-access
elif isinstance(exception, errors.AMQPLinkError):
if hasattr(closable, "_close_handler"):
closable._close_handler() # pylint:disable=protected-access
elif isinstance(exception, errors.AMQPConnectionError):
if hasattr(closable, "_close_connection"):
closable._close_connection() # pylint:disable=protected-access
# TODO: add MessageHandlerError in amqp?
# elif isinstance(exception, errors.MessageHandlerError):
# if hasattr(closable, "_close_handler"):
# closable._close_handler() # pylint:disable=protected-access
else: # errors.AMQPConnectionError, compat.TimeoutException
if hasattr(closable, "_close_connection"):
closable._close_connection() # pylint:disable=protected-access
return _create_eventhub_exception(exception)