Source code for azure.eventhub._producer_client

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import logging
import threading

from typing import Any, Union, TYPE_CHECKING, Iterable, Dict
from uamqp import constants  # type:ignore

from .exceptions import ConnectError, EventHubError
from ._client_base import ClientBase
from ._producer import EventHubProducer
from ._constants import ALL_PARTITIONS
from ._common import EventData, EventDataBatch

if TYPE_CHECKING:
    from azure.core.credentials import TokenCredential  # type: ignore

_LOGGER = logging.getLogger(__name__)


[docs]class EventHubProducerClient(ClientBase): """The EventHubProducerClient class defines a high level interface for sending events to the Azure Event Hubs service. :param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace. This is likely to be similar to <yournamespace>.servicebus.windows.net :param str eventhub_name: The path of the specific Event Hub to connect the client to. :param ~azure.core.credentials.TokenCredential credential: The credential object used for authentication which implements a particular interface for getting tokens. It accepts :class:`EventHubSharedKeyCredential<azure.eventhub.EventHubSharedKeyCredential>`, or credential objects generated by the azure-identity library and objects that implement the `get_token(self, *scopes)` method. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. :keyword str user_agent: The user agent that should be appended to the built-in user agent string. :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default value is 3. :keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close if there is no further activity. By default the value is None, meaning that the service determines when to close an idle connection. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). Additionally the following keys may also be present: `'username', 'password'`. .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START create_eventhub_producer_client_sync] :end-before: [END create_eventhub_producer_client_sync] :language: python :dedent: 4 :caption: Create a new instance of the EventHubProducerClient. """ def __init__(self, fully_qualified_namespace, # type: str eventhub_name, # type: str credential, # type: TokenCredential **kwargs # type: Any ): # type:(...) -> None super(EventHubProducerClient, self).__init__( fully_qualified_namespace=fully_qualified_namespace, eventhub_name=eventhub_name, credential=credential, network_tracing=kwargs.get("logging_enable"), **kwargs ) self._producers = {ALL_PARTITIONS: self._create_producer()} # type: Dict[str, EventHubProducer] self._max_message_size_on_link = 0 self._partition_ids = None self._lock = threading.Lock() def _get_partitions(self): if not self._partition_ids: self._partition_ids = self.get_partition_ids() for p_id in self._partition_ids: self._producers[p_id] = None def _get_max_mesage_size(self): # pylint: disable=protected-access with self._lock: if not self._max_message_size_on_link: self._producers[ALL_PARTITIONS]._open_with_retry() self._max_message_size_on_link = \ self._producers[ALL_PARTITIONS]._handler.message_handler._link.peer_max_message_size \ or constants.MAX_MESSAGE_LENGTH_BYTES def _start_producer(self, partition_id, send_timeout): with self._lock: self._get_partitions() if partition_id not in self._partition_ids and partition_id != ALL_PARTITIONS: raise ConnectError("Invalid partition {} for the event hub {}".format(partition_id, self.eventhub_name)) if not self._producers[partition_id] or self._producers[partition_id].closed: self._producers[partition_id] = self._create_producer( partition_id=partition_id, send_timeout=send_timeout ) def _create_producer(self, partition_id=None, send_timeout=None): target = "amqps://{}{}".format(self._address.hostname, self._address.path) send_timeout = self._config.send_timeout if send_timeout is None else send_timeout handler = EventHubProducer( self, target, partition=partition_id, send_timeout=send_timeout, idle_timeout=self._idle_timeout ) return handler
[docs] @classmethod def from_connection_string(cls, conn_str, **kwargs): # type: (str, Any) -> EventHubProducerClient """Create an EventHubProducerClient from a connection string. :param str conn_str: The connection string of an Event Hub. :keyword str eventhub_name: The path of the specific Event Hub to connect the client to. :keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`. :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). Additionally the following keys may also be present: `'username', 'password'`. :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. :keyword str user_agent: The user agent that should be appended to the built-in user agent string. :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default value is 3. :keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close if there is no further activity. By default the value is None, meaning that the service determines when to close an idle connection. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType :rtype: ~azure.eventhub.EventHubProducerClient .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START create_eventhub_producer_client_from_conn_str_sync] :end-before: [END create_eventhub_producer_client_from_conn_str_sync] :language: python :dedent: 4 :caption: Create a new instance of the EventHubProducerClient from connection string. """ constructor_args = cls._from_connection_string(conn_str, **kwargs) return cls(**constructor_args)
[docs] def send_batch(self, event_data_batch, **kwargs): # type: (EventDataBatch, Any) -> None """Sends event data and blocks until acknowledgement is received or operation times out. :param event_data_batch: The EventDataBatch object to be sent. :type event_data_batch: ~azure.eventhub.EventDataBatch :keyword float timeout: The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used. :rtype: None :raises: :class:`AuthenticationError<azure.eventhub.exceptions.AuthenticationError>` :class:`ConnectError<azure.eventhub.exceptions.ConnectError>` :class:`ConnectionLostError<azure.eventhub.exceptions.ConnectionLostError>` :class:`EventDataError<azure.eventhub.exceptions.EventDataError>` :class:`EventDataSendError<azure.eventhub.exceptions.EventDataSendError>` :class:`EventHubError<azure.eventhub.exceptions.EventHubError>` .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START eventhub_producer_client_send_sync] :end-before: [END eventhub_producer_client_send_sync] :language: python :dedent: 4 :caption: Sends event data """ partition_id = event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access send_timeout = kwargs.pop("timeout", None) try: self._producers[partition_id].send(event_data_batch, timeout=send_timeout) except (KeyError, AttributeError, EventHubError): self._start_producer(partition_id, send_timeout) self._producers[partition_id].send(event_data_batch, timeout=send_timeout)
[docs] def create_batch(self, **kwargs): # type:(Any) -> EventDataBatch """Create an EventDataBatch object with the max size of all content being constrained by max_size_in_bytes. The max_size should be no greater than the max allowed message size defined by the service. :keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin. :keyword str partition_key: With the given partition_key, event data will be sent to a particular partition of the Event Hub decided by the service. :keyword int max_size_in_bytes: The maximum size of bytes data that an EventDataBatch object can hold. :rtype: ~azure.eventhub.EventDataBatch .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START eventhub_producer_client_create_batch_sync] :end-before: [END eventhub_producer_client_create_batch_sync] :language: python :dedent: 4 :caption: Create EventDataBatch object within limited size """ if not self._max_message_size_on_link: self._get_max_mesage_size() max_size_in_bytes = kwargs.get("max_size_in_bytes", None) partition_id = kwargs.get("partition_id", None) partition_key = kwargs.get("partition_key", None) if max_size_in_bytes and max_size_in_bytes > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' .format(max_size_in_bytes, self._max_message_size_on_link)) event_data_batch = EventDataBatch( max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link), partition_id=partition_id, partition_key=partition_key ) return event_data_batch
[docs] def close(self): # type: () -> None """Close the Producer client underlying AMQP connection and links. :rtype: None .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START eventhub_producer_client_close_sync] :end-before: [END eventhub_producer_client_close_sync] :language: python :dedent: 4 :caption: Close down the client. """ with self._lock: for producer in self._producers.values(): if producer: producer.close() self._producers = {} self._conn_manager.close_connection()