Source code for azure.servicebus.base_handler

# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------

import time
import logging
try:
    from urlparse import urlparse
except ImportError:
    from urllib.parse import urlparse

from uamqp import AMQPClient
from uamqp import authentication
from uamqp import constants, errors
from uamqp.message import Message, MessageProperties

from azure.servicebus.common.constants import ASSOCIATEDLINKPROPERTYNAME
from azure.servicebus.common.utils import create_properties
from azure.servicebus.common.errors import (
    _ServiceBusErrorPolicy,
    InvalidHandlerState,
    ServiceBusError,
    ServiceBusConnectionError,
    ServiceBusAuthorizationError)


_log = logging.getLogger(__name__)


[docs]class BaseHandler(object): # pylint: disable=too-many-instance-attributes def __init__(self, endpoint, auth_config, connection=None, encoding='UTF-8', debug=False, **kwargs): self.running = False self.error = None self.endpoint = endpoint self.entity = urlparse(endpoint).path.strip('/') self.mgmt_target = self.entity + "/$management" self.debug = debug self.encoding = encoding self.auth_config = auth_config self.connection = connection self.auto_reconnect = kwargs.pop('auto_reconnect', True) self.properties = create_properties() self.error_policy = kwargs.pop('error_policy', None) self.handler_kwargs = kwargs if not self.error_policy: max_retries = kwargs.pop('max_message_retries', 3) is_session = hasattr(self, 'session_id') self.error_policy = _ServiceBusErrorPolicy(max_retries=max_retries, is_session=is_session) self._handler = None self._build_handler() def __enter__(self): """Open the handler in a context manager.""" self.open() return self def __exit__(self, *args): """Close the handler when exiting a context manager.""" self.close() def _build_handler(self): auth = None if self.connection else authentication.SASTokenAuth.from_shared_access_key(**self.auth_config) self._handler = AMQPClient( self.endpoint, auth=auth, debug=self.debug, properties=self.properties, error_policy=self.error_policy, encoding=self.encoding, **self.handler_kwargs) def _mgmt_request_response(self, operation, message, callback, keep_alive_associated_link=True, **kwargs): if not self.running: raise InvalidHandlerState("Client connection is closed.") application_properties = {} # Some mgmt calls do not support an associated link name. Most do, however, so on by default. if keep_alive_associated_link: try: application_properties = {ASSOCIATEDLINKPROPERTYNAME:self._handler.message_handler.name} except AttributeError: pass mgmt_msg = Message( body=message, properties=MessageProperties( reply_to=self.mgmt_target, encoding=self.encoding, **kwargs), application_properties=application_properties) try: return self._handler.mgmt_request( mgmt_msg, operation, op_type=b"entity-mgmt", node=self.mgmt_target.encode(self.encoding), timeout=5000, callback=callback) except Exception as exp: # pylint: disable=broad-except raise ServiceBusError("Management request failed: {}".format(exp), exp) def _handle_exception(self, exception): if isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): if exception.action and exception.action.retry and self.auto_reconnect: _log.info("Handler detached. Attempting reconnect.") self.reconnect() elif exception.condition == constants.ErrorCodes.UnauthorizedAccess: _log.info("Handler detached. Shutting down.") error = ServiceBusAuthorizationError(str(exception), exception) self.close(exception=error) raise error else: _log.info("Handler detached. Shutting down.") error = ServiceBusConnectionError(str(exception), exception) self.close(exception=error) raise error elif isinstance(exception, errors.MessageHandlerError): if self.auto_reconnect: _log.info("Handler error. Attempting reconnect.") self.reconnect() else: _log.info("Handler error. Shutting down.") error = ServiceBusConnectionError(str(exception), exception) self.close(exception=error) raise error elif isinstance(exception, errors.AMQPConnectionError): message = "Failed to open handler: {}".format(exception) raise ServiceBusConnectionError(message, exception) else: _log.info("Unexpected error occurred (%r). Shutting down.", exception) error = ServiceBusError("Handler failed: {}".format(exception)) self.close(exception=error) raise error
[docs] def reconnect(self): """Reconnect the handler. If the handler was disconnected from the service with a retryable error - attempt to reconnect. This method will be called automatically for most retryable errors. """ self._handler.close() self.running = False self._build_handler() self.open()
[docs] def open(self): """Open handler connection and authenticate session. If the handler is already open, this operation will do nothing. A handler opened with this method must be explicitly closed. It is recommended to open a handler within a context manager as opposed to calling the method directly. .. note:: This operation is not thread-safe. """ if self.running: return self.running = True try: self._handler.open(connection=self.connection) while not self._handler.client_ready(): time.sleep(0.05) except Exception as e: # pylint: disable=broad-except try: self._handle_exception(e) except: self.running = False raise
[docs] def close(self, exception=None): """Close down the handler connection. If the handler has already closed, this operation will do nothing. An optional exception can be passed in to indicate that the handler was shutdown due to error. It is recommended to open a handler within a context manager as opposed to calling the method directly. .. note:: This operation is not thread-safe. :param exception: An optional exception if the handler is closing due to an error. :type exception: Exception """ self.running = False if self.error: return if isinstance(exception, ServiceBusError): self.error = exception elif exception: self.error = ServiceBusError(str(exception)) else: self.error = ServiceBusError("This message handler is now closed.") self._handler.close()