Source code for azure.eventhub.producer

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

import uuid
import logging
import time
from typing import Iterable, Union, Type

from uamqp import types, constants, errors  # type: ignore
from uamqp import SendClient  # type: ignore

from azure.core.tracing import SpanKind, AbstractSpan  # type: ignore
from azure.core.settings import settings  # type: ignore

from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError
from ._consumer_producer_mixin import ConsumerProducerMixin


log = logging.getLogger(__name__)


def _error(outcome, condition):
    if outcome != constants.MessageSendResult.Ok:
        raise condition


def _set_partition_key(event_datas, partition_key):
    ed_iter = iter(event_datas)
    for ed in ed_iter:
        ed._set_partition_key(partition_key)  # pylint:disable=protected-access
        yield ed


def _set_trace_message(event_datas, parent_span=None):
    ed_iter = iter(event_datas)
    for ed in ed_iter:
        ed._trace_message(parent_span)  # pylint:disable=protected-access
        yield ed


[docs]class EventHubProducer(ConsumerProducerMixin): # pylint:disable=too-many-instance-attributes """ A producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the options specified at creation, the producer may be created to allow event data to be automatically routed to an available partition or specific to a partition. Please use the method `create_producer` on `EventHubClient` for creating `EventHubProducer`. """ _timeout_symbol = b'com.microsoft:timeout' def __init__(self, client, target, **kwargs): """ Instantiate an EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` method in EventHubClient. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient. :param target: The URI of the EventHub to send to. :type target: str :param partition: The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin. :type partition: str :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout. :type send_timeout: float :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during periods of inactivity. The default value is None, i.e. no keep alive pings. :type keep_alive: float :param auto_reconnect: Whether to automatically reconnect the producer if a retryable error occurs. Default value is `True`. :type auto_reconnect: bool """ partition = kwargs.get("partition", None) send_timeout = kwargs.get("send_timeout", 60) keep_alive = kwargs.get("keep_alive", None) auto_reconnect = kwargs.get("auto_reconnect", True) super(EventHubProducer, self).__init__() self._max_message_size_on_link = None self._client = client self._target = target self._partition = partition self._timeout = send_timeout self._error = None self._keep_alive = keep_alive self._auto_reconnect = auto_reconnect self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint: disable=protected-access self._reconnect_backoff = 1 self._name = "EHProducer-{}".format(uuid.uuid4()) self._unsent_events = None if partition: self._target += "/Partitions/" + partition self._name += "-partition{}".format(partition) self._handler = None self._outcome = None self._condition = None self._link_properties = {types.AMQPSymbol(self._timeout_symbol): types.AMQPLong(int(self._timeout * 1000))} def _create_handler(self): self._handler = SendClient( self._target, auth=self._client._create_auth(), # pylint:disable=protected-access debug=self._client._config.network_tracing, # pylint:disable=protected-access msg_timeout=self._timeout, error_policy=self._retry_policy, keep_alive_interval=self._keep_alive, client_name=self._name, link_properties=self._link_properties, properties=self._client._create_properties(self._client._config.user_agent)) # pylint: disable=protected-access def _open_with_retry(self): return self._do_retryable_operation(self._open, operation_need_param=False) def _send_event_data(self, timeout_time=None, last_exception=None): if self._unsent_events: self._open() remaining_time = timeout_time - time.time() if remaining_time <= 0.0: if last_exception: error = last_exception else: error = OperationTimeoutError("send operation timed out") log.info("%r send operation timed out. (%r)", self._name, error) raise error self._handler._msg_timeout = remaining_time * 1000 # pylint: disable=protected-access self._handler.queue_message(*self._unsent_events) self._handler.wait() self._unsent_events = self._handler.pending_messages if self._outcome != constants.MessageSendResult.Ok: if self._outcome == constants.MessageSendResult.Timeout: self._condition = OperationTimeoutError("send operation timed out") _error(self._outcome, self._condition) def _send_event_data_with_retry(self, timeout=None): return self._do_retryable_operation(self._send_event_data, timeout=timeout) def _on_outcome(self, outcome, condition): """ Called when the outcome is received for a delivery. :param outcome: The outcome of the message delivery - success or failure. :type outcome: ~uamqp.constants.MessageSendResult :param condition: Detail information of the outcome. """ self._outcome = outcome self._condition = condition
[docs] def create_batch(self, max_size=None, partition_key=None): # type:(int, str) -> EventDataBatch """ Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side. :param max_size: The maximum size of bytes data that an EventDataBatch object can hold. :type max_size: int :param partition_key: With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. :type partition_key: str :return: an EventDataBatch instance :rtype: ~azure.eventhub.EventDataBatch Example: .. literalinclude:: ../examples/test_examples_eventhub.py :start-after: [START eventhub_client_sync_create_batch] :end-before: [END eventhub_client_sync_create_batch] :language: python :dedent: 4 :caption: Create EventDataBatch object within limited size """ if not self._max_message_size_on_link: self._open_with_retry() if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' .format(max_size, self._max_message_size_on_link)) return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key)
[docs] def send(self, event_data, partition_key=None, timeout=None): # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes], float) -> None """ Sends an event data and blocks until acknowledgement is received or operation times out. :param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects :type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list :param partition_key: With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. partition_key could be omitted if event_data is of type ~azure.eventhub.EventDataBatch. :type partition_key: str :param timeout: The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used. :type timeout: float :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError :return: None :rtype: None Example: .. literalinclude:: ../examples/test_examples_eventhub.py :start-after: [START eventhub_client_sync_send] :end-before: [END eventhub_client_sync_send] :language: python :dedent: 4 :caption: Sends an event data and blocks until acknowledgement is received or operation times out. """ # Tracing code span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] child = None if span_impl_type is not None: child = span_impl_type(name="Azure.EventHubs.send") child.kind = SpanKind.CLIENT # Should be PRODUCER self._check_closed() if isinstance(event_data, EventData): if partition_key: event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data wrapper_event_data._trace_message(child) # pylint: disable=protected-access else: if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access raise EventDataError('The partition_key does not match the one of the EventDataBatch') wrapper_event_data = event_data # type:ignore else: if partition_key: event_data = _set_partition_key(event_data, partition_key) event_data = _set_trace_message(event_data, child) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] if span_impl_type is not None and child is not None: with child: self._client._add_span_request_attributes(child) # pylint: disable=protected-access self._send_event_data_with_retry(timeout=timeout) else: self._send_event_data_with_retry(timeout=timeout)
[docs] def close(self): # pylint:disable=useless-super-delegation # type:() -> None """ Close down the handler. If the handler has already closed, this will be a no op. Example: .. literalinclude:: ../examples/test_examples_eventhub.py :start-after: [START eventhub_client_sender_close] :end-before: [END eventhub_client_sender_close] :language: python :dedent: 4 :caption: Close down the handler. """ super(EventHubProducer, self).close()