Source code for azure.eventhub.exceptions

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Union, List, Optional


[docs] class EventHubError(Exception): """Represents an error occurred in the client. :ivar message: The error message. :vartype message: str :ivar error: The error condition, if available. :vartype error: str :ivar details: The error details, if included in the service response. :vartype details: list[str] """ def __init__(self, message: str, details: Optional[List[str]] = None) -> None: self.error: Optional[str] = None self.message: str = message self.details: Union[List[str]] if details and isinstance(details, Exception): # TODO: issue #34266 try: condition = details.condition.value.decode("UTF-8") # type: ignore[attr-defined] except AttributeError: try: condition = details.condition.decode("UTF-8") # type: ignore[attr-defined] except AttributeError: condition = None if condition: _, _, self.error = condition.partition(":") self.message += "\nError: {}".format(self.error) try: self._parse_error(details.description) # type: ignore[attr-defined] for detail in self.details: self.message += "\n{}".format(detail) except: # pylint: disable=bare-except self.message += "\n{}".format(details) super(EventHubError, self).__init__(self.message) def _parse_error(self, error_list: Union[str, bytes]) -> None: details = [] self.message = error_list if isinstance(error_list, str) else error_list.decode("UTF-8") details_index = self.message.find(" Reference:") if details_index >= 0: details_msg = self.message[details_index + 1 :] self.message = self.message[0:details_index] tracking_index = details_msg.index(", TrackingId:") system_index = details_msg.index(", SystemTracker:") timestamp_index = details_msg.index(", Timestamp:") details.append(details_msg[:tracking_index]) details.append(details_msg[tracking_index + 2 : system_index]) details.append(details_msg[system_index + 2 : timestamp_index]) details.append(details_msg[timestamp_index + 2 :]) self.details = details
[docs] class ClientClosedError(EventHubError): """The Client has been closed and is unable to process further events."""
[docs] class ConnectionLostError(EventHubError): """Connection to the Event Hub is lost. In most cases the client will automatically retry on this error."""
[docs] class ConnectError(EventHubError): """Failed to connect to the Event Hubs service."""
[docs] class AuthenticationError(ConnectError): """Failed to connect to the Event Hubs service because of an authentication issue."""
[docs] class EventDataError(EventHubError): """Client prevented problematic event data from being sent."""
[docs] class EventDataSendError(EventHubError): """Service returned an error while an event data is being sent."""
[docs] class OperationTimeoutError(EventHubError): """Operation timed out."""
[docs] class OwnershipLostError(Exception): """Raised when `update_checkpoint` detects the ownership to a partition has been lost."""