Source code for azure.eventhub.client_abstract

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

import logging
import sys
import platform
import uuid
import time
from abc import abstractmethod
from typing import Union, Any, TYPE_CHECKING

from uamqp import types  # type: ignore
from azure.eventhub import __version__
from azure.eventhub.configuration import _Configuration
from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential, _Address

try:
    from urlparse import urlparse  # type: ignore
    from urllib import urlencode, quote_plus  # type: ignore
except ImportError:
    from urllib.parse import urlparse, urlencode, quote_plus

if TYPE_CHECKING:
    from azure.core.credentials import TokenCredential  # type: ignore

log = logging.getLogger(__name__)
MAX_USER_AGENT_LENGTH = 512


def _parse_conn_str(conn_str):
    endpoint = None
    shared_access_key_name = None
    shared_access_key = None
    entity_path = None
    for element in conn_str.split(';'):
        key, _, value = element.partition('=')
        if key.lower() == 'endpoint':
            endpoint = value.rstrip('/')
        elif key.lower() == 'hostname':
            endpoint = value.rstrip('/')
        elif key.lower() == 'sharedaccesskeyname':
            shared_access_key_name = value
        elif key.lower() == 'sharedaccesskey':
            shared_access_key = value
        elif key.lower() == 'entitypath':
            entity_path = value
    if not all([endpoint, shared_access_key_name, shared_access_key]):
        raise ValueError("Invalid connection string")
    return endpoint, shared_access_key_name, shared_access_key, entity_path


def _generate_sas_token(uri, policy, key, expiry=None):
    """Create a shared access signiture token as a string literal.
    :returns: SAS token as string literal.
    :rtype: str
    """
    from base64 import b64encode, b64decode
    from hashlib import sha256
    from hmac import HMAC
    if not expiry:
        expiry = time.time() + 3600  # Default to 1 hour.
    encoded_uri = quote_plus(uri)
    ttl = int(expiry)
    sign_key = '%s\n%d' % (encoded_uri, ttl)
    signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest())
    result = {
        'sr': uri,
        'sig': signature,
        'se': str(ttl)}
    if policy:
        result['skn'] = policy
    return 'SharedAccessSignature ' + urlencode(result)


def _build_uri(address, entity):
    parsed = urlparse(address)
    if parsed.path:
        return address
    if not entity:
        raise ValueError("No EventHub specified")
    address += "/" + str(entity)
    return address


[docs]class EventHubClientAbstract(object): # pylint:disable=too-many-instance-attributes """ The EventHubClientAbstract class defines a high level interface for sending events to and receiving events from the Azure Event Hubs service. """ def __init__(self, host, event_hub_path, credential, **kwargs): # type:(str, str, Union[EventHubSharedKeyCredential, EventHubSASTokenCredential, TokenCredential], Any) -> None """ Constructs a new EventHubClient. :param host: The hostname of the Event Hub. :type host: str :param event_hub_path: The path of the specific Event Hub to connect the client to. :type event_hub_path: str :param network_tracing: Whether to output network trace logs to the logger. Default is `False`. :type network_tracing: bool :param credential: The credential object used for authentication which implements particular interface of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method. :param http_proxy: HTTP proxy settings. This must be a dictionary with the following keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). Additionally the following keys may also be present: 'username', 'password'. :type http_proxy: dict[str, Any] :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. :type auth_timeout: float :param user_agent: The user agent that needs to be appended to the built in user agent string. :type user_agent: str :param retry_total: The total number of attempts to redo the failed operation when an error happened. Default value is 3. :type retry_total: int :param transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp. :type transport_type: ~azure.eventhub.TransportType :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch. :type max_batch_size: int :param receive_timeout: The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds. :type receive_timeout: float :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout. :type send_timeout: float """ self.eh_name = event_hub_path self._host = host self._container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] self._address = _Address() self._address.hostname = host self._address.path = "/" + event_hub_path if event_hub_path else "" self._credential = credential self._keep_alive = kwargs.get("keep_alive", 30) self._auto_reconnect = kwargs.get("auto_reconnect", True) self._mgmt_target = "amqps://{}/{}".format(self._host, self.eh_name) self._auth_uri = "sb://{}{}".format(self._address.hostname, self._address.path) self._config = _Configuration(**kwargs) self._debug = self._config.network_tracing log.info("%r: Created the Event Hub client", self._container_id) @abstractmethod def _create_auth(self): pass def _create_properties(self, user_agent=None): # pylint: disable=no-self-use """ Format the properties with which to instantiate the connection. This acts like a user agent over HTTP. :rtype: dict """ properties = {} product = "azure-eventhub" properties[types.AMQPSymbol("product")] = product properties[types.AMQPSymbol("version")] = __version__ framework = "Python {}.{}.{}, {}".format( sys.version_info[0], sys.version_info[1], sys.version_info[2], platform.python_implementation() ) properties[types.AMQPSymbol("framework")] = framework platform_str = platform.platform() properties[types.AMQPSymbol("platform")] = platform_str final_user_agent = '{}/{} ({}, {})'.format(product, __version__, framework, platform_str) if user_agent: final_user_agent = '{}, {}'.format(final_user_agent, user_agent) if len(final_user_agent) > MAX_USER_AGENT_LENGTH: raise ValueError("The user-agent string cannot be more than {} in length." "Current user_agent string is: {} with length: {}".format( MAX_USER_AGENT_LENGTH, final_user_agent, len(final_user_agent))) properties[types.AMQPSymbol("user-agent")] = final_user_agent return properties def _add_span_request_attributes(self, span): span.add_attribute("component", "eventhubs") span.add_attribute("message_bus.destination", self._address.path) span.add_attribute("peer.address", self._address.hostname)
[docs] @classmethod def from_connection_string(cls, conn_str, **kwargs): """Create an EventHubClient from an EventHub connection string. :param conn_str: The connection string of an eventhub :type conn_str: str :param event_hub_path: The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string. :type event_hub_path: str :param network_tracing: Whether to output network trace logs to the logger. Default is `False`. :type network_tracing: bool :param http_proxy: HTTP proxy settings. This must be a dictionary with the following keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). Additionally the following keys may also be present: 'username', 'password'. :type http_proxy: dict[str, Any] :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. :type auth_timeout: float :param user_agent: The user agent that needs to be appended to the built in user agent string. :type user_agent: str :param retry_total: The total number of attempts to redo the failed operation when an error happened. Default value is 3. :type retry_total: int :param transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp. :type transport_type: ~azure.eventhub.TransportType :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch. :type max_batch_size: int :param receive_timeout: The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds, meaning there is no timeout. :type receive_timeout: float :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout. :type send_timeout: float Example: .. literalinclude:: ../examples/test_examples_eventhub.py :start-after: [START create_eventhub_client_connstr] :end-before: [END create_eventhub_client_connstr] :language: python :dedent: 4 :caption: Create an EventHubClient from a connection string. """ event_hub_path = kwargs.pop("event_hub_path", None) address, policy, key, entity = _parse_conn_str(conn_str) entity = event_hub_path or entity left_slash_pos = address.find("//") if left_slash_pos != -1: host = address[left_slash_pos + 2:] else: host = address return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs)