Source code for azure.eventhub.aio._producer_client_async

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import asyncio
import logging

from typing import Any, Union, TYPE_CHECKING, List, Optional, Dict, cast
from uamqp import constants  # type: ignore

from ..exceptions import ConnectError, EventHubError
from ._client_base_async import ClientBaseAsync
from ._producer_async import EventHubProducer
from .._constants import ALL_PARTITIONS
from .._common import EventData, EventDataBatch

if TYPE_CHECKING:
    from uamqp.constants import TransportType
    from azure.core.credentials import TokenCredential  # type: ignore

_LOGGER = logging.getLogger(__name__)


[docs]class EventHubProducerClient(ClientBaseAsync): """ The EventHubProducerClient class defines a high level interface for sending events to the Azure Event Hubs service. :param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace. This is likely to be similar to <yournamespace>.servicebus.windows.net :param str eventhub_name: The path of the specific Event Hub to connect the client to. :param ~azure.core.credentials.TokenCredential credential: The credential object used for authentication which implements a particular interface for getting tokens. It accepts :class:`EventHubSharedKeyCredential<azure.eventhub.aio.EventHubSharedKeyCredential>`, or credential objects generated by the azure-identity library and objects that implement the `get_token(self, *scopes)` method. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. :keyword str user_agent: The user agent that should be appended to the built-in user agent string. :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default value is 3. :keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close if there is no further activity. By default the value is None, meaning that the service determines when to close an idle connection. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). Additionally the following keys may also be present: `'username', 'password'`. .. admonition:: Example: .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START create_eventhub_producer_client_async] :end-before: [END create_eventhub_producer_client_async] :language: python :dedent: 4 :caption: Create a new instance of the EventHubProducerClient. """ def __init__(self, fully_qualified_namespace: str, eventhub_name: str, credential: 'TokenCredential', **kwargs) -> None: super(EventHubProducerClient, self).__init__( fully_qualified_namespace=fully_qualified_namespace, eventhub_name=eventhub_name, credential=credential, network_tracing=kwargs.pop("logging_enable", False), **kwargs ) self._producers = {ALL_PARTITIONS: self._create_producer()} # type: Dict[str, Optional[EventHubProducer]] self._lock = asyncio.Lock() # sync the creation of self._producers self._max_message_size_on_link = 0 self._partition_ids = None # Optional[List[str]] async def _get_partitions(self) -> None: if not self._partition_ids: self._partition_ids = await self.get_partition_ids() # type: ignore for p_id in cast(List[str], self._partition_ids): self._producers[p_id] = None async def _get_max_mesage_size(self) -> None: # pylint: disable=protected-access,line-too-long async with self._lock: if not self._max_message_size_on_link: await cast(EventHubProducer, self._producers[ALL_PARTITIONS])._open_with_retry() self._max_message_size_on_link = \ cast(EventHubProducer, self._producers[ALL_PARTITIONS])._handler.message_handler._link.peer_max_message_size \ or constants.MAX_MESSAGE_LENGTH_BYTES async def _start_producer(self, partition_id: str, send_timeout: Optional[Union[int, float]]) -> None: async with self._lock: await self._get_partitions() if partition_id not in cast(List[str], self._partition_ids) and partition_id != ALL_PARTITIONS: raise ConnectError("Invalid partition {} for the event hub {}".format(partition_id, self.eventhub_name)) if not self._producers[partition_id] or cast(EventHubProducer, self._producers[partition_id]).closed: self._producers[partition_id] = self._create_producer( partition_id=partition_id, send_timeout=send_timeout ) def _create_producer( self, *, partition_id: Optional[str] = None, send_timeout: Optional[Union[int, float]] = None, loop: Optional[asyncio.AbstractEventLoop] = None ) -> EventHubProducer: target = "amqps://{}{}".format(self._address.hostname, self._address.path) send_timeout = self._config.send_timeout if send_timeout is None else send_timeout handler = EventHubProducer( self, target, partition=partition_id, send_timeout=send_timeout, idle_timeout=self._idle_timeout, loop=loop ) return handler
[docs] @classmethod def from_connection_string( cls, conn_str: str, *, eventhub_name: Optional[str] = None, logging_enable: bool = False, http_proxy: Optional[Dict[str, Union[str, int]]] = None, auth_timeout: float = 60, user_agent: Optional[str] = None, retry_total: int = 3, transport_type: Optional['TransportType'] = None, **kwargs: Any ) -> 'EventHubProducerClient': """Create an EventHubProducerClient from a connection string. :param str conn_str: The connection string of an Event Hub. :keyword str eventhub_name: The path of the specific Event Hub to connect the client to. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). Additionally the following keys may also be present: `'username', 'password'`. :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. :keyword str user_agent: The user agent that should be appended to the built-in user agent string. :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default value is 3. :keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close if there is no further activity. By default the value is None, meaning that the service determines when to close an idle connection. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType :rtype: ~azure.eventhub.aio.EventHubProducerClient .. admonition:: Example: .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START create_eventhub_producer_client_from_conn_str_async] :end-before: [END create_eventhub_producer_client_from_conn_str_async] :language: python :dedent: 4 :caption: Create a new instance of the EventHubProducerClient from connection string. """ constructor_args = cls._from_connection_string( conn_str, eventhub_name=eventhub_name, logging_enable=logging_enable, http_proxy=http_proxy, auth_timeout=auth_timeout, user_agent=user_agent, retry_total=retry_total, transport_type=transport_type, **kwargs ) return cls(**constructor_args)
[docs] async def send_batch( self, event_data_batch: EventDataBatch, *, timeout: Optional[Union[int, float]] = None) -> None: """Sends event data and blocks until acknowledgement is received or operation times out. :param event_data_batch: The EventDataBatch object to be sent. :type event_data_batch: ~azure.eventhub.EventDataBatch :param float timeout: The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used. :rtype: None :raises: :class:`AuthenticationError<azure.eventhub.exceptions.AuthenticationError>` :class:`ConnectError<azure.eventhub.exceptions.ConnectError>` :class:`ConnectionLostError<azure.eventhub.exceptions.ConnectionLostError>` :class:`EventDataError<azure.eventhub.exceptions.EventDataError>` :class:`EventDataSendError<azure.eventhub.exceptions.EventDataSendError>` :class:`EventHubError<azure.eventhub.exceptions.EventHubError>` .. admonition:: Example: .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START eventhub_producer_client_send_async] :end-before: [END eventhub_producer_client_send_async] :language: python :dedent: 4 :caption: Asynchronously sends event data """ partition_id = event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access try: await cast(EventHubProducer, self._producers[partition_id]).send(event_data_batch, timeout=timeout) except (KeyError, AttributeError, EventHubError): await self._start_producer(partition_id, timeout) await cast(EventHubProducer, self._producers[partition_id]).send(event_data_batch, timeout=timeout)
[docs] async def create_batch( self, *, partition_id: Optional[str] = None, partition_key: Optional[str] = None, max_size_in_bytes: Optional[int] = None) -> EventDataBatch: """Create an EventDataBatch object with the max size of all content being constrained by max_size_in_bytes. The max_size should be no greater than the max allowed message size defined by the service. :param str partition_id: The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin. :param str partition_key: With the given partition_key, event data will be sent to a particular partition of the Event Hub decided by the service. :param int max_size_in_bytes: The maximum size of bytes data that an EventDataBatch object can hold. :rtype: ~azure.eventhub.EventDataBatch .. admonition:: Example: .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START eventhub_producer_client_create_batch_async] :end-before: [END eventhub_producer_client_create_batch_async] :language: python :dedent: 4 :caption: Create EventDataBatch object within limited size """ if not self._max_message_size_on_link: await self._get_max_mesage_size() if max_size_in_bytes and max_size_in_bytes > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' .format(max_size_in_bytes, self._max_message_size_on_link)) event_data_batch = EventDataBatch( max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link), partition_id=partition_id, partition_key=partition_key ) return event_data_batch
[docs] async def close(self) -> None: """Close the Producer client underlying AMQP connection and links. :rtype: None .. admonition:: Example: .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START eventhub_producer_client_close_async] :end-before: [END eventhub_producer_client_close_async] :language: python :dedent: 4 :caption: Close down the handler. """ async with self._lock: for producer in self._producers.values(): if producer: await producer.close() await self._conn_manager.close_connection()