# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import asyncio
import logging
import time
from typing import Any, Union, List, Optional, Dict, Callable, cast
from typing_extensions import TYPE_CHECKING, Literal, Awaitable, overload
from ..exceptions import ConnectError, EventHubError
from ..amqp import AmqpAnnotatedMessage
from ._client_base_async import ClientBaseAsync
from ._producer_async import EventHubProducer
from ._buffered_producer import BufferedProducerDispatcher
from .._utils import set_event_partition_key
from .._constants import ALL_PARTITIONS, TransportType
from .._tracing import TraceAttributes
from .._common import EventDataBatch, EventData
if TYPE_CHECKING:
from ssl import SSLContext
from ._client_base_async import CredentialTypes
SendEventTypes = List[Union[EventData, AmqpAnnotatedMessage]]
_LOGGER = logging.getLogger(__name__)
[docs]
class EventHubProducerClient(ClientBaseAsync): # pylint: disable=client-accepts-api-version-keyword
# pylint: disable=too-many-instance-attributes
# pylint: disable=client-method-missing-tracing-decorator-async
"""
The EventHubProducerClient class defines a high level interface for
sending events to the Azure Event Hubs service.
:param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace.
This is likely to be similar to <yournamespace>.servicebus.windows.net
:param str eventhub_name: The path of the specific Event Hub to connect the client to.
:param credential: The credential object used for authentication which
implements a particular interface for getting tokens. It accepts
:class:`EventHubSharedKeyCredential<azure.eventhub.aio.EventHubSharedKeyCredential>`, or credential objects
generated by the azure-identity library and objects that implement the `get_token(self, *scopes)` method.
:type credential: ~azure.core.credentials_async.AsyncTokenCredential or ~azure.core.credentials.AzureSasCredential
or ~azure.core.credentials.AzureNamedKeyCredential
:keyword bool buffered_mode: If True, the producer client will collect events in a buffer, efficiently batch,
then publish. Default is False.
:keyword on_success: The callback to be called once a batch has been successfully published.
The callback takes two parameters:
- `events`: The list of events that have been successfully published
- `partition_id`: The partition id that the events in the list have been published to.
The callback function should be defined like: `on_success(events, partition_id)`.
Required when `buffered_mode` is True while optional if `buffered_mode` is False.
:paramtype on_success: Optional[Callable[[SendEventTypes, Optional[str]], Awaitable[None]]]
:keyword on_error: The callback to be called once a batch has failed to be published.
Required when in `buffered_mode` is True while optional if `buffered_mode` is False.
The callback function should be defined like: `on_error(events, partition_id, error)`, where:
- `events`: The list of events that failed to be published,
- `partition_id`: The partition id that the events in the list have been tried to be published to and
- `error`: The exception related to the sending failure.
If `buffered_mode` is False, `on_error` callback is optional and errors will be handled as follows:
- If an `on_error` callback is passed during the producer client instantiation,
then error information will be passed to the `on_error` callback, which will then be called.
- If an `on_error` callback is not passed in during client instantiation,
then the error will be raised by default.
If `buffered_mode` is True, `on_error` callback is required and errors will be handled as follows:
- If events fail to enqueue within the given timeout, then an error will be directly raised.
- If events fail to send after enqueuing successfully, the `on_error` callback will be called.
:paramtype on_error: Optional[Callable[[SendEventTypes, Optional[str], Exception], Awaitable[None]]]
:keyword int max_buffer_length: Buffered mode only.
The total number of events per partition that can be buffered before a flush will be triggered.
The default value is 1500 in buffered mode.
:keyword Optional[float] max_wait_time: Buffered mode only.
The amount of time to wait for a batch to be built with events in the buffer before publishing.
The default value is 1 in buffered mode.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service.
The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
:keyword str user_agent: If specified, this will be added in front of the user agent string.
:keyword retry_total: The total number of attempts to redo a failed operation when an error occurs. Default
value is 3.
:paramtype retry_total: int
:keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
(most errors are resolved immediately by a second try without a delay).
In fixed mode, retry policy will always sleep for {backoff factor}.
In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
seconds. If the backoff_factor is 0.1, then the retry will sleep
for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
:keyword float retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
:keyword retry_mode: The delay behavior between retry attempts. Supported values are 'fixed' or 'exponential',
where default is 'exponential'.
:paramtype retry_mode: str
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity
unless initiated by the service.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp` in which case port 5671 is used.
If the port 5671 is unavailable/blocked in the network environment, `TransportType.AmqpOverWebsocket` could
be used instead which uses port 443 for communication.
:paramtype transport_type: ~azure.eventhub.TransportType
:keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value).
Additionally the following keys may also be present: `'username', 'password'`.
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: Optional[str]
:keyword connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
authenticate the identity of the connection endpoint.
Default is None in which case `certifi.where()` will be used.
:paramtype connection_verify: Optional[str]
:keyword ssl_context: The SSLContext object to use in the underlying Pure Python AMQP transport. If specified,
connection_verify will be ignored.
:paramtype ssl_context: ssl.SSLContext or None
:keyword uamqp_transport: Whether to use the `uamqp` library as the underlying transport. The default value is
False and the Pure Python AMQP library will be used as the underlying transport.
:paramtype uamqp_transport: bool
:keyword float socket_timeout: The time in seconds that the underlying socket on the connection should
wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp
and 1 for TransportType.AmqpOverWebsocket. If EventHubsConnectionError errors are occurring due to write
timing out, a larger than default value may need to be passed in. This is for advanced usage scenarios
and ordinarily the default value should be sufficient.
.. admonition:: Example:
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START create_eventhub_producer_client_async]
:end-before: [END create_eventhub_producer_client_async]
:language: python
:dedent: 4
:caption: Create a new instance of the EventHubProducerClient.
"""
@overload
def __init__(
self,
fully_qualified_namespace: str,
eventhub_name: str,
credential: "CredentialTypes",
*,
buffered_mode: Literal[False] = False,
**kwargs: Any,
) -> None: ...
@overload
def __init__(
self,
fully_qualified_namespace: str,
eventhub_name: str,
credential: "CredentialTypes",
*,
buffered_mode: Literal[True],
on_error: Callable[[SendEventTypes, Optional[str], Exception], Awaitable[None]],
on_success: Callable[[SendEventTypes, Optional[str]], Awaitable[None]],
max_buffer_length: int = 1500,
max_wait_time: float = 1,
**kwargs: Any,
) -> None: ...
def __init__(
self,
fully_qualified_namespace: str,
eventhub_name: str,
credential: "CredentialTypes",
*,
buffered_mode: bool = False,
on_error: Optional[Callable[[SendEventTypes, Optional[str], Exception], Awaitable[None]]] = None,
on_success: Optional[Callable[[SendEventTypes, Optional[str]], Awaitable[None]]] = None,
max_buffer_length: Optional[int] = None,
max_wait_time: Optional[float] = None,
**kwargs: Any,
) -> None:
super(EventHubProducerClient, self).__init__(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=credential,
network_tracing=kwargs.pop("logging_enable", False),
**kwargs,
)
self._auth_uri = f"sb://{self._address.hostname}{self._address.path}"
self._keep_alive = kwargs.get("keep_alive", None)
self._producers: Dict[str, Optional[EventHubProducer]] = {ALL_PARTITIONS: self._create_producer()}
self._lock = asyncio.Lock(**self._internal_kwargs) # sync the creation of self._producers
self._max_message_size_on_link = 0
self._partition_ids = None # Optional[List[str]]
self._buffered_mode = buffered_mode
self._on_success = on_success
self._on_error = on_error
self._buffered_producer_dispatcher = None
self._max_buffer_length = max_buffer_length
self._max_wait_time = max_wait_time
if self._buffered_mode:
setattr(self, "send_batch", self._buffered_send_batch)
setattr(self, "send_event", self._buffered_send_event)
if not self._on_error:
raise TypeError(
"EventHubProducerClient in buffered mode missing 1 required keyword argument: 'on_error'"
)
if not self._on_success:
raise TypeError(
"EventHubProducerClient in buffered mode missing 1 required keyword argument: 'on_success'"
)
if self._max_wait_time is None:
self._max_wait_time = 1
if self._max_wait_time <= 0:
raise ValueError("'max_wait_time' must be a float greater than 0 in buffered mode")
if self._max_buffer_length is None:
self._max_buffer_length = 1500
if self._max_buffer_length <= 0:
raise ValueError("'max_buffer_length' must be an integer greater than 0 in buffered mode")
async def __aenter__(self) -> "EventHubProducerClient":
return self
async def __aexit__(self, *args: Any) -> None:
await self.close()
async def _buffered_send(self, events, **kwargs):
try:
await self._buffered_producer_dispatcher.enqueue_events(events, **kwargs)
except AttributeError:
await self._get_partitions()
await self._get_max_message_size()
self._buffered_producer_dispatcher = BufferedProducerDispatcher(
self._partition_ids,
self._on_success,
self._on_error,
self._create_producer,
self.eventhub_name,
self._max_message_size_on_link,
max_wait_time=self._max_wait_time,
max_buffer_length=self._max_buffer_length,
amqp_transport=self._amqp_transport,
)
await self._buffered_producer_dispatcher.enqueue_events(events, **kwargs)
async def _batch_preparer(
self,
event_data_batch: Union[EventDataBatch, SendEventTypes],
*,
partition_id: Optional[str] = None,
partition_key: Optional[str] = None,
):
if isinstance(event_data_batch, EventDataBatch):
if partition_id or partition_key:
raise TypeError(
"partition_id and partition_key should be None when sending an EventDataBatch "
"because type EventDataBatch itself may have partition_id or partition_key"
)
to_send_batch = event_data_batch
else:
to_send_batch = await self.create_batch(partition_id=partition_id, partition_key=partition_key)
to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access
return (
to_send_batch,
to_send_batch._partition_id, # pylint:disable=protected-access
partition_key,
)
async def _buffered_send_batch( # pylint: disable=unused-argument
self,
event_data_batch: Union[EventDataBatch, SendEventTypes],
*,
timeout: Optional[float] = None,
partition_id: Optional[str] = None,
partition_key: Optional[str] = None,
**kwargs: Any,
):
batch, pid, pkey = await self._batch_preparer(
event_data_batch, partition_id=partition_id, partition_key=partition_key
)
if len(batch) == 0:
return
timeout_time = time.time() + timeout if timeout else None
await self._buffered_send(
event_data_batch,
partition_id=pid,
partition_key=pkey,
timeout_time=timeout_time,
)
async def _buffered_send_event(
self, # pylint: disable=unused-argument
event: Union[EventData, AmqpAnnotatedMessage],
*,
timeout: Optional[float] = None,
partition_id: Optional[str] = None,
partition_key: Optional[str] = None,
**kwargs: Any, # pylint: disable=unused-argument
):
set_event_partition_key(event, partition_key, self._amqp_transport)
timeout_time = time.time() + timeout if timeout else None
await self._buffered_send(
event,
partition_id=partition_id,
partition_key=partition_key,
timeout_time=timeout_time,
)
async def _get_partitions(self) -> None:
if not self._partition_ids:
self._partition_ids = await self.get_partition_ids() # type: ignore
for p_id in cast(List[str], self._partition_ids):
self._producers[p_id] = None
async def _get_max_message_size(self) -> None:
# pylint: disable=protected-access,line-too-long
async with self._lock:
if not self._max_message_size_on_link:
await cast(EventHubProducer, self._producers[ALL_PARTITIONS])._open_with_retry()
self._max_message_size_on_link = (
self._amqp_transport.get_remote_max_message_size(
cast(EventHubProducer, self._producers[ALL_PARTITIONS])._handler # type: ignore
)
or self._amqp_transport.MAX_MESSAGE_LENGTH_BYTES
)
async def _start_producer(self, partition_id: str, send_timeout: Optional[Union[int, float]]) -> None:
async with self._lock:
await self._get_partitions()
if partition_id not in cast(List[str], self._partition_ids) and partition_id != ALL_PARTITIONS:
raise ConnectError("Invalid partition {} for the event hub {}".format(partition_id, self.eventhub_name))
if not self._producers[partition_id] or cast(EventHubProducer, self._producers[partition_id]).closed:
self._producers[partition_id] = self._create_producer(
partition_id=(None if partition_id == ALL_PARTITIONS else partition_id),
send_timeout=send_timeout,
)
def _create_producer(
self, *, partition_id: Optional[str] = None, send_timeout: Optional[Union[int, float]] = None
) -> EventHubProducer:
target = "amqps://{}{}".format(self._address.hostname, self._address.path)
send_timeout = self._config.send_timeout if send_timeout is None else send_timeout
handler = EventHubProducer( # type: ignore
self,
target,
partition=partition_id,
send_timeout=send_timeout,
idle_timeout=self._idle_timeout,
amqp_transport=self._amqp_transport,
keep_alive=self._keep_alive,
**self._internal_kwargs,
)
return handler
@classmethod
@overload
def from_connection_string(
cls, conn_str: str, *, eventhub_name: Optional[str] = None, buffered_mode: Literal[False] = False, **kwargs: Any
) -> "EventHubProducerClient": ...
@classmethod
@overload
def from_connection_string(
cls,
conn_str: str,
*,
eventhub_name: Optional[str] = None,
buffered_mode: Literal[True],
on_error: Callable[[SendEventTypes, Optional[str], Exception], Awaitable[None]],
on_success: Callable[[SendEventTypes, Optional[str]], Awaitable[None]],
max_buffer_length: int = 1500,
max_wait_time: float = 1,
**kwargs: Any,
) -> "EventHubProducerClient": ...
[docs]
@classmethod
def from_connection_string(
cls,
conn_str: str,
*,
eventhub_name: Optional[str] = None,
buffered_mode: bool = False,
on_success: Optional[Callable[[SendEventTypes, Optional[str]], Awaitable[None]]] = None,
on_error: Optional[Callable[[SendEventTypes, Optional[str], Exception], Awaitable[None]]] = None,
max_buffer_length: Optional[int] = None,
max_wait_time: Optional[float] = None,
logging_enable: bool = False,
http_proxy: Optional[Dict[str, Union[str, int]]] = None,
auth_timeout: float = 60,
user_agent: Optional[str] = None,
retry_total: int = 3,
retry_backoff_factor: Optional[float] = 0.8,
retry_backoff_max: Optional[float] = 120,
retry_mode: Literal["exponential", "fixed"] = "exponential",
idle_timeout: Optional[float] = None,
transport_type: TransportType = TransportType.Amqp,
custom_endpoint_address: Optional[str] = None,
connection_verify: Optional[str] = None,
ssl_context: Optional["SSLContext"] = None,
uamqp_transport: bool = False,
**kwargs: Any,
) -> "EventHubProducerClient":
"""Create an EventHubProducerClient from a connection string.
:param str conn_str: The connection string of an Event Hub.
:keyword str eventhub_name: The path of the specific Event Hub to connect the client to.
:keyword bool buffered_mode: If True, the producer client will collect events in a buffer, efficiently batch,
then publish. Default is False.
:keyword on_success: The callback to be called once a batch has been successfully published.
The callback takes two parameters:
- `events`: The list of events that have been successfully published
- `partition_id`: The partition id that the events in the list have been published to.
The callback function should be defined like: `on_success(events, partition_id)`.
It is required when `buffered_mode` is True while optional if `buffered_mode` is False.
:paramtype on_success: Optional[Callable[[SendEventTypes, Optional[str]], Awaitable[None]]]
:keyword on_error: The callback to be called once a batch has failed to be published.
The callback function should be defined like: `on_error(events, partition_id, error)`, where:
- `events`: The list of events that failed to be published,
- `partition_id`: The partition id that the events in the list have been tried to be published to and
- `error`: The exception related to the sending failure.
If `buffered_mode` is False, `on_error` callback is optional and errors will be handled as follows:
- If an `on_error` callback is passed during the producer client instantiation,
then error information will be passed to the `on_error` callback, which will then be called.
- If an `on_error` callback is not passed in during client instantiation,
then the error will be raised by default.
If `buffered_mode` is True, `on_error` callback is required and errors will be handled as follows:
- If events fail to enqueue within the given timeout, then an error will be directly raised.
- If events fail to send after enqueuing successfully, the `on_error` callback will be called.
:paramtype on_error: Optional[Callable[[SendEventTypes, Optional[str], Exception], Awaitable[None]]]
:keyword int max_buffer_length: Buffered mode only.
The total number of events per partition that can be buffered before a flush will be triggered.
The default value is 1500 in buffered mode.
:keyword Optional[float] max_wait_time: Buffered mode only.
The amount of time to wait for a batch to be built with events in the buffer before publishing.
The default value is 1 in buffered mode.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value).
Additionally the following keys may also be present: `'username', 'password'`.
:keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service.
The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
:keyword str user_agent: If specified, this will be added in front of the user agent string.
:keyword retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
:paramtype retry_total: int
:keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
(most errors are resolved immediately by a second try without a delay).
In fixed mode, retry policy will always sleep for {backoff factor}.
In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
seconds. If the backoff_factor is 0.1, then the retry will sleep
for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
:keyword float retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
:keyword retry_mode: The delay behavior between retry attempts. Supported values are 'fixed' or 'exponential',
where default is 'exponential'.
:paramtype retry_mode: str
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to
inactivity unless initiated by the service.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp` in which case port 5671 is used.
If the port 5671 is unavailable/blocked in the network environment, `TransportType.AmqpOverWebsocket` could
be used instead which uses port 443 for communication.
:paramtype transport_type: ~azure.eventhub.TransportType
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: Optional[str]
:keyword connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
authenticate the identity of the connection endpoint.
Default is None in which case `certifi.where()` will be used.
:paramtype connection_verify: Optional[str]
:keyword ssl_context: The SSLContext object to use in the underlying Pure Python AMQP transport. If specified,
connection_verify will be ignored.
:paramtype ssl_context: ssl.SSLContext or None
:keyword uamqp_transport: Whether to use the `uamqp` library as the underlying transport. The default value is
False and the Pure Python AMQP library will be used as the underlying transport.
:paramtype uamqp_transport: bool
:returns: An EventHubProducerClient instance.
:rtype: ~azure.eventhub.aio.EventHubProducerClient
.. admonition:: Example:
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START create_eventhub_producer_client_from_conn_str_async]
:end-before: [END create_eventhub_producer_client_from_conn_str_async]
:language: python
:dedent: 4
:caption: Create a new instance of the EventHubProducerClient from connection string.
"""
constructor_args = cls._from_connection_string(
conn_str,
eventhub_name=eventhub_name,
buffered_mode=buffered_mode,
on_success=on_success,
on_error=on_error,
max_buffer_length=max_buffer_length,
max_wait_time=max_wait_time,
logging_enable=logging_enable,
http_proxy=http_proxy,
auth_timeout=auth_timeout,
user_agent=user_agent,
retry_total=retry_total,
retry_backoff_factor=retry_backoff_factor,
retry_backoff_max=retry_backoff_max,
retry_mode=retry_mode,
idle_timeout=idle_timeout,
transport_type=transport_type,
custom_endpoint_address=custom_endpoint_address,
connection_verify=connection_verify,
ssl_context=ssl_context,
uamqp_transport=uamqp_transport,
**kwargs,
)
return cls(**constructor_args)
[docs]
async def send_event(
self, # pylint: disable=unused-argument
event_data: Union[EventData, AmqpAnnotatedMessage],
*,
timeout: Optional[float] = None,
partition_id: Optional[str] = None,
partition_key: Optional[str] = None,
**kwargs: Any, # pylint: disable=unused-argument
) -> None:
"""
Sends an event data.
By default, the method will block until acknowledgement is received or operation times out.
If the `EventHubProducerClient` is configured to run in buffered mode, the method will enqueue the event
into local buffer and return. The producer will do automatic batching and sending in the background.
If `buffered_mode` is False, `on_error` callback is optional and errors will be handled as follows:
- If an `on_error` callback is passed during the producer client instantiation,
then error information will be passed to the `on_error` callback, which will then be called.
- If an `on_error` callback is not passed in during client instantiation,
then the error will be raised by default.
If `buffered_mode` is True, `on_error` callback is required and errors will be handled as follows:
- If events fail to enqueue within the given timeout, then an error will be directly raised.
- If events fail to send after enqueuing successfully, the `on_error` callback will be called.
:param event_data: The `EventData` object to be sent.
:type event_data: Union[~azure.eventhub.EventData, ~azure.eventhub.amqp.AmqpAnnotatedMessage]
:keyword float timeout: The maximum wait time to send the event data in non-buffered mode or the
maximum wait time to enqueue the event data into the buffer in buffered mode.
In non-buffered mode, the default wait time specified when the producer
was created will be used. In buffered mode, the default wait time is None indicating that the event will be
scheduled to send immediately.
:keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service
will assign to all partitions using round-robin.
A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because
`EventDataBatch` itself has partition_id.
:keyword str partition_key: With the given partition_key, event data will be sent to
a particular partition of the Event Hub decided by the service.
A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because
`EventDataBatch` itself has partition_key.
If both partition_id and partition_key are provided, the partition_id will take precedence.
**WARNING: Setting partition_key of non-string value on the events to be sent is discouraged
as the partition_key will be ignored by the Event Hub service and events will be assigned
to all partitions using round-robin. Furthermore, there are SDKs for consuming events which expect
partition_key to only be string type, they might fail to parse the non-string value.**
:rtype: None
:raises: :class:`AuthenticationError<azure.eventhub.exceptions.AuthenticationError>`
:class:`ConnectError<azure.eventhub.exceptions.ConnectError>`
:class:`ConnectionLostError<azure.eventhub.exceptions.ConnectionLostError>`
:class:`EventDataError<azure.eventhub.exceptions.EventDataError>`
:class:`EventDataSendError<azure.eventhub.exceptions.EventDataSendError>`
:class:`EventHubError<azure.eventhub.exceptions.EventHubError>`
:raises OperationTimeoutError: If the value specified by the timeout parameter elapses before the event can be
sent in non-buffered mode or the events can not be enqueued into the buffered in buffered mode.
"""
input_pid = partition_id
pid = input_pid or ALL_PARTITIONS
try:
try:
await cast(EventHubProducer, self._producers[pid]).send(
event_data, partition_key=partition_key, timeout=timeout
)
except (KeyError, AttributeError, EventHubError):
await self._start_producer(pid, timeout)
await cast(EventHubProducer, self._producers[pid]).send(
event_data, partition_key=partition_key, timeout=timeout
)
if self._on_success:
await self._on_success([event_data], input_pid)
except Exception as exc: # pylint: disable=broad-except
if self._on_error:
await self._on_error([event_data], input_pid, exc)
else:
raise
[docs]
async def send_batch( # pylint: disable=unused-argument
self,
event_data_batch: Union[EventDataBatch, SendEventTypes],
*,
timeout: Optional[float] = None,
partition_id: Optional[str] = None,
partition_key: Optional[str] = None,
**kwargs: Any,
) -> None:
# pylint: disable=protected-access
"""
Sends a batch of event data.
By default, the method will block until acknowledgement is received or operation times out.
If the `EventHubProducerClient` is configured to run in buffered mode, the method will enqueue the events
into local buffer and return. The producer will do automatic sending in the background.
If `buffered_mode` is False, `on_error` callback is optional and errors will be handled as follows:
- If an `on_error` callback is passed during the producer client instantiation,
then error information will be passed to the `on_error` callback, which will then be called.
- If an `on_error` callback is not passed in during client instantiation,
then the error will be raised by default.
If `buffered_mode` is True, `on_error` callback is required and errors will be handled as follows:
- If events fail to enqueue within the given timeout, then an error will be directly raised.
- If events fail to send after enqueuing successfully, the `on_error` callback will be called.
In buffered mode, sending a batch will remain intact and sent as a single unit.
The batch will not be rearranged. This may result in inefficiency of sending events.
If you're sending a finite list of `EventData` or `AmqpAnnotatedMessage` and you know it's within the
event hub frame size limit, you can send them with a `send_batch` call. Otherwise, use :meth:`create_batch`
to create `EventDataBatch` and add either `EventData` or `AmqpAnnotatedMessage` into the batch one by one
until the size limit, and then call this method to send out the batch.
:param event_data_batch: The `EventDataBatch` object to be sent or a list of `EventData` to be sent in a batch.
All `EventData` or `AmqpAnnotatedMessage` in the list or `EventDataBatch` will land on the same partition.
:type event_data_batch: Union[~azure.eventhub.EventDataBatch, List[Union[~azure.eventhub.EventData,
~azure.eventhub.amqp.AmqpAnnotatedMessage]]
:keyword float timeout: The maximum wait time to send the event data in non-buffered mode or the
maximum wait time to enqueue the event data into the buffer in buffered mode.
In non-buffered mode, the default wait time specified when the producer
was created will be used. In buffered mode, the default wait time is None.
:keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service
will assign to all partitions using round-robin.
A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because
`EventDataBatch` itself has partition_id.
:keyword str partition_key: With the given partition_key, event data will be sent to
a particular partition of the Event Hub decided by the service.
A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because
`EventDataBatch` itself has partition_key.
If both partition_id and partition_key are provided, the partition_id will take precedence.
**WARNING: Setting partition_key of non-string value on the events to be sent is discouraged
as the partition_key will be ignored by the Event Hub service and events will be assigned
to all partitions using round-robin. Furthermore, there are SDKs for consuming events which expect
partition_key to only be string type, they might fail to parse the non-string value.**
:rtype: None
:raises: :class:`AuthenticationError<azure.eventhub.exceptions.AuthenticationError>`
:class:`ConnectError<azure.eventhub.exceptions.ConnectError>`
:class:`ConnectionLostError<azure.eventhub.exceptions.ConnectionLostError>`
:class:`EventDataError<azure.eventhub.exceptions.EventDataError>`
:class:`EventDataSendError<azure.eventhub.exceptions.EventDataSendError>`
:class:`EventHubError<azure.eventhub.exceptions.EventHubError>`
:class:`ValueError`
:class:`TypeError`
:raises OperationTimeoutError: If the value specified by the timeout parameter elapses before the event can be
sent in non-buffered mode or the events can be enqueued into the buffered in buffered mode.
.. admonition:: Example:
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START eventhub_producer_client_send_async]
:end-before: [END eventhub_producer_client_send_async]
:language: python
:dedent: 4
:caption: Asynchronously sends event data
"""
batch, pid, pkey = await self._batch_preparer(
event_data_batch, partition_id=partition_id, partition_key=partition_key
)
if len(batch) == 0:
return
partition_id = pid or ALL_PARTITIONS
try:
try:
await cast(EventHubProducer, self._producers[partition_id]).send(
batch, partition_key=pkey, timeout=timeout
)
if self._on_success:
await self._on_success(batch._internal_events, pid)
except (KeyError, AttributeError, EventHubError):
await self._start_producer(partition_id, timeout)
await cast(EventHubProducer, self._producers[partition_id]).send(
batch, partition_key=pkey, timeout=timeout
)
if self._on_success:
await self._on_success(batch._internal_events, pid)
except Exception as exc: # pylint: disable=broad-except
if self._on_error:
await self._on_error(batch._internal_events, pid, exc)
else:
raise
[docs]
async def create_batch(
self,
*,
partition_id: Optional[str] = None,
partition_key: Optional[str] = None,
max_size_in_bytes: Optional[int] = None,
) -> EventDataBatch:
"""Create an EventDataBatch object with the max size of all content being constrained by max_size_in_bytes.
The max_size_in_bytes should be no greater than the max allowed message size defined by the service.
:keyword str or None partition_id: The specific partition ID to send to.
Default is None, in which case the service will assign to all partitions using round-robin.
:keyword str or None partition_key: With the given partition_key, event data will be sent to
a particular partition of the Event Hub decided by the service.
If both partition_id and partition_key are provided, the partition_id will take precedence.
**WARNING: Setting partition_key of non-string value on the events to be sent is discouraged
as the partition_key will be ignored by the Event Hub service and events will be assigned
to all partitions using round-robin. Furthermore, there are SDKs for consuming events which expect
partition_key to only be string type, they might fail to parse the non-string value.**
:keyword int or None max_size_in_bytes: The maximum size of bytes data that an EventDataBatch
object can hold. By default, the value is determined by your Event Hubs tier.
:return: An EventDataBatch object
.. admonition:: Example:
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START eventhub_producer_client_create_batch_async]
:end-before: [END eventhub_producer_client_create_batch_async]
:language: python
:dedent: 4
:caption: Create EventDataBatch object within limited size
:rtype: ~azure.eventhub.EventDataBatch
"""
if not self._max_message_size_on_link:
await self._get_max_message_size()
if max_size_in_bytes and max_size_in_bytes > self._max_message_size_on_link:
raise ValueError(
"Max message size: {} is too large, acceptable max batch size is: {} bytes.".format(
max_size_in_bytes, self._max_message_size_on_link
)
)
return EventDataBatch(
max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link),
partition_id=partition_id,
partition_key=partition_key,
amqp_transport=self._amqp_transport,
tracing_attributes={
TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self._address.hostname if self._address else None,
TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self._address.path if self._address else None,
},
)
[docs]
async def get_eventhub_properties(self) -> Dict[str, Any]:
"""Get properties of the Event Hub.
Keys in the returned dictionary include:
- `eventhub_name` (str)
- `created_at` (UTC datetime.datetime)
- `partition_ids` (list[str])
:return: A dictionary containing information about the Event Hub.
:rtype: dict[str, any]
:raises: :class:`EventHubError<azure.eventhub.exceptions.EventHubError>`
"""
return await super(EventHubProducerClient, self)._get_eventhub_properties_async()
[docs]
async def get_partition_ids(self) -> List[str]:
"""Get partition IDs of the Event Hub.
:return: A list of partition IDs.
:rtype: list[str]
:raises: :class:`EventHubError<azure.eventhub.exceptions.EventHubError>`
"""
return await super(EventHubProducerClient, self)._get_partition_ids_async()
[docs]
async def get_partition_properties(self, partition_id: str) -> Dict[str, Any]:
"""Get properties of the specified partition.
Keys in the properties dictionary include:
- `eventhub_name` (str)
- `id` (str)
- `beginning_sequence_number` (int)
- `last_enqueued_sequence_number` (int)
- `last_enqueued_offset` (str)
- `last_enqueued_time_utc` (UTC datetime.datetime)
- `is_empty` (bool)
:param partition_id: The target partition ID.
:type partition_id: str
:return: A dict of partition properties.
:rtype: dict[str, any]
:raises: :class:`EventHubError<azure.eventhub.exceptions.EventHubError>`
"""
return await super(EventHubProducerClient, self)._get_partition_properties_async(partition_id)
[docs]
async def flush(self, *, timeout: Optional[float] = None, **kwargs: Any) -> None: # pylint:disable=unused-argument
"""
Buffered mode only.
Flush events in the buffer to be sent immediately if the client is working in buffered mode.
:keyword timeout: Timeout to flush the buffered events, default is None which means no timeout.
:paramtype timeout: float or None
:rtype: None
:raises EventDataSendError: If the producer fails to flush the buffer within the given timeout
in buffered mode.
"""
async with self._lock:
if self._buffered_mode and self._buffered_producer_dispatcher:
timeout_time = time.time() + timeout if timeout else None
await self._buffered_producer_dispatcher.flush(timeout_time=timeout_time)
[docs]
async def close(
self, *, flush: bool = True, timeout: Optional[float] = None, **kwargs: Any # pylint:disable=unused-argument
) -> None:
"""Close the Producer client underlying AMQP connection and links.
:keyword bool flush: Buffered mode only. If set to True, events in the buffer will be sent
immediately. Default is True.
:keyword timeout: Buffered mode only. Timeout to close the producer.
Default is None which means no timeout.
:paramtype timeout: float or None
:rtype: None
:raises EventHubError: If an error occurred when flushing the buffer if `flush` is set to True or closing the
underlying AMQP connections in buffered mode.
.. admonition:: Example:
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START eventhub_producer_client_close_async]
:end-before: [END eventhub_producer_client_close_async]
:language: python
:dedent: 4
:caption: Close down the handler.
"""
async with self._lock:
if self._buffered_mode and self._buffered_producer_dispatcher:
timeout_time = time.time() + timeout if timeout else None
await self._buffered_producer_dispatcher.close(flush=flush, timeout_time=timeout_time, raise_error=True)
self._buffered_producer_dispatcher = None
for pid, producer in self._producers.items():
if producer is not None:
await producer.close() # type: ignore
self._producers[pid] = None
await super(EventHubProducerClient, self)._close_async()
[docs]
def get_buffered_event_count(self, partition_id: str) -> Optional[int]:
"""
The number of events that are buffered and waiting to be published for a given partition.
Returns None in non-buffered mode. **NOTE: The event buffer is processed in a background
coroutine, therefore the number of events in the buffer reported by this API should be
considered only an approximation and is only recommend for use in debugging. For a
partition ID that has no events buffered, 0 will be returned regardless of whether that
partition ID actually exists within the Event Hub.**
:param str partition_id: The target partition ID.
:rtype: int or None
"""
if not self._buffered_mode:
return None
try:
return cast(BufferedProducerDispatcher, self._buffered_producer_dispatcher).get_buffered_event_count(
partition_id
)
except AttributeError:
return 0
@property
def total_buffered_event_count(self) -> Optional[int]:
"""
The total number of events that are currently buffered and waiting to be published,
across all partitions. Returns None in non-buffered mode. **NOTE: The event buffer
is processed in a background coroutine, therefore the number of events in the buffer
reported by this API should be considered only an approximation and is only recommend
for use in debugging.**
:rtype: int or None
"""
if not self._buffered_mode:
return None
try:
return cast(BufferedProducerDispatcher, self._buffered_producer_dispatcher).total_buffered_event_count
except AttributeError:
return 0