azure.eventhub.aio package

Submodules

azure.eventhub.aio.client_async module

class azure.eventhub.aio.client_async.EventHubClient(host, event_hub_path, credential, **kwargs)[source]

The EventHubClient class defines a high level interface for asynchronously sending events to and receiving events from the Azure Event Hubs service.

Example

Create a new instance of the Event Hub client async.
from azure.eventhub.aio import EventHubClient
import os
connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
    os.environ['EVENT_HUB_HOSTNAME'],
    os.environ['EVENT_HUB_SAS_POLICY'],
    os.environ['EVENT_HUB_SAS_KEY'],
    os.environ['EVENT_HUB_NAME'])
client = EventHubClient.from_connection_string(connection_str)

Constructs a new EventHubClient.

Parameters
  • host (str) – The hostname of the Event Hub.

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.

  • http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.

  • receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

async close()[source]
create_consumer(consumer_group: str, partition_id: str, event_position: azure.eventhub.common.EventPosition, **kwargs) → azure.eventhub.aio.consumer_async.EventHubConsumer[source]

Create an async consumer to the client for a particular consumer group and partition.

Parameters
  • consumer_group (str) – The name of the consumer group this consumer is associated with. Events are read in the context of this group. The default consumer_group for an event hub is “$Default”.

  • partition_id (str) – The identifier of the Event Hub partition from which events will be received.

  • event_position (EventPosition) – The position within the partition where the consumer should begin reading events.

  • owner_level (int) – The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

  • loop – An event loop. If not specified the default event loop will be used.

Return type

EventHubConsumer

Example

Add an async consumer to the client for a particular consumer group and partition.
client = EventHubClient.from_connection_string(connection_str)
# Create an async consumer.
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'))
# Create an exclusive async consumer.
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'), owner_level=1)
create_producer(*, partition_id: str = None, send_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None) → azure.eventhub.aio.producer_async.EventHubProducer[source]

Create an async producer to send EventData object to an EventHub.

Parameters
  • partition_id (str) – Optionally specify a particular partition to send to. If omitted, the events will be distributed to available partitions via round-robin.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

  • loop – An event loop. If not specified the default event loop will be used.

Return type

EventHubProducer

Example

Add an async producer to the client to send EventData.
client = EventHubClient.from_connection_string(connection_str)
# Create an async producer.
producer = client.create_producer(partition_id="0")
classmethod from_connection_string(conn_str, **kwargs)

Create an EventHubClient from an EventHub connection string.

Parameters
  • conn_str (str) – The connection string of an eventhub

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.

  • receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds, meaning there is no timeout.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

Example

Create an EventHubClient from a connection string.
import os
from azure.eventhub import EventHubClient

connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
    os.environ['EVENT_HUB_HOSTNAME'],
    os.environ['EVENT_HUB_SAS_POLICY'],
    os.environ['EVENT_HUB_SAS_KEY'],
    os.environ['EVENT_HUB_NAME'])
client = EventHubClient.from_connection_string(connection_str)
async get_partition_ids()[source]

Get partition ids of the specified EventHub async.

Return type

list[str]

Raises

~azure.eventhub.ConnectError

async get_partition_properties(partition)[source]

Get properties of the specified partition async. Keys in the details dictionary include:

-‘event_hub_path’ -‘id’ -‘beginning_sequence_number’ -‘last_enqueued_sequence_number’ -‘last_enqueued_offset’ -‘last_enqueued_time_utc’ -‘is_empty’

Parameters

partition (str) – The target partition id.

Return type

dict

Raises

~azure.eventhub.EventHubError

async get_properties()[source]

Get properties of the specified EventHub async. Keys in the details dictionary include:

-‘path’ -‘created_at’ -‘partition_ids’

Return type

dict

Raises

~azure.eventhub.EventHubError

azure.eventhub.aio.consumer_async module

class azure.eventhub.aio.consumer_async.EventHubConsumer(client, source, **kwargs)[source]

A consumer responsible for reading EventData from a specific Event Hub partition and as a member of a specific consumer group.

A consumer may be exclusive, which asserts ownership over the partition for the consumer group to ensure that only one consumer from that group is reading the from the partition. These exclusive consumers are sometimes referred to as “Epoch Consumers.”

A consumer may also be non-exclusive, allowing multiple consumers from the same consumer group to be actively reading events from the partition. These non-exclusive consumers are sometimes referred to as “Non-Epoch Consumers.”

Please use the method create_consumer on EventHubClient for creating EventHubConsumer.

Instantiate an async consumer. EventHubConsumer should be instantiated by calling the create_consumer method in EventHubClient.

Parameters
  • client (EventHubClientAsync) – The parent EventHubClientAsync.

  • source (Source) – The source EventHub from which to receive events.

  • event_position (EventPosition) – The position from which to start receiving.

  • prefetch (int) – The number of events to prefetch from the service for processing. Default is 300.

  • owner_level (int) – The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set.

  • track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

  • loop – An event loop.

async close()[source]

Close down the handler. If the handler has already closed, this will be a no op.

Example

Close down the handler.
client = EventHubClient.from_connection_string(connection_str)
consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'))
try:
    # Open and receive
    await consumer.receive(timeout=1)
except:
    raise
finally:
    # Close down the receive handler.
    await consumer.close()
async receive(*, max_batch_size=None, timeout=None)[source]

Receive events asynchronously from the EventHub.

Parameters
  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. If combined with a timeout and no events are retrieve before the time, the result will be empty. If no batch size is supplied, the prefetch size will be the maximum.

  • timeout (float) – The maximum wait time to build up the requested message count for the batch. If not specified, the default wait time specified when the consumer was created will be used.

Return type

list[EventData]

Raises

~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventHubError

Example

Receives events asynchronously
logger = logging.getLogger("azure.eventhub")
async with consumer:
    received = await consumer.receive(timeout=5)
    for event_data in received:
        logger.info("Message received:{}".format(event_data.body_as_str()))
property last_enqueued_event_properties

The latest enqueued event information. This property will be updated each time an event is received when the receiver is created with track_last_enqueued_event_properties being True. The dict includes following information of the partition:

  • sequence_number

  • offset

  • enqueued_time

  • retrieval_time

Return type

dict or None

property queue_size

The current size of the unprocessed Event queue.

Return type

int

azure.eventhub.aio.error_async module

azure.eventhub.aio.producer_async module

class azure.eventhub.aio.producer_async.EventHubProducer(client, target, **kwargs)[source]

A producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the options specified at creation, the producer may be created to allow event data to be automatically routed to an available partition or specific to a partition.

Please use the method create_producer on EventHubClient for creating EventHubProducer.

Instantiate an async EventHubProducer. EventHubProducer should be instantiated by calling the create_producer method in EventHubClient.

Parameters
  • client (EventHubClientAsync) – The parent EventHubClientAsync.

  • target (str) – The URI of the EventHub to send to.

  • partition (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

  • keep_alive (float) – The time interval in seconds between pinging the connection to keep it alive during periods of inactivity. The default value is None, i.e. no keep alive pings.

  • auto_reconnect (bool) – Whether to automatically reconnect the producer if a retryable error occurs. Default value is True.

  • loop – An event loop. If not specified the default event loop will be used.

async close()[source]

Close down the handler. If the handler has already closed, this will be a no op.

Example

Close down the handler.
client = EventHubClient.from_connection_string(connection_str)
producer = client.create_producer(partition_id="0")
try:
    await producer.send(EventData(b"A single event"))
finally:
    # Close down the send handler.
    await producer.close()
async create_batch(max_size=None, partition_key=None)[source]

Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side.

Parameters
  • max_size (int) – The maximum size of bytes data that an EventDataBatch object can hold.

  • partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service.

Returns

an EventDataBatch instance

Return type

EventDataBatch

Example

Create EventDataBatch object within limited size
event_data_batch = await producer.create_batch(max_size=10000)
while True:
    try:
        event_data_batch.try_add(EventData('Message inside EventBatchData'))
    except ValueError:
        # The EventDataBatch object reaches its max_size.
        # You can send the full EventDataBatch object and create a new one here.
        break
async send(event_data: Union[azure.eventhub.common.EventData, azure.eventhub.common.EventDataBatch, Iterable[azure.eventhub.common.EventData]], *, partition_key: Union[str, bytes] = None, timeout: float = None)[source]

Sends an event data and blocks until acknowledgement is received or operation times out.

Parameters
  • event_data (EventData, Iterator, Generator, list) – The event to be sent. It can be an EventData object, or iterable of EventData objects

  • partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. partition_key could be omitted if event_data is of type ~azure.eventhub.EventDataBatch.

  • timeout (float) – The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used.

Raises

~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError

Returns

None

Return type

None

Example

Sends an event data and blocks until acknowledgement is received or operation times out.
async with producer:
    event_data = EventData(b"A single event")
    await producer.send(event_data)

Module contents

class azure.eventhub.aio.EventHubClient(host, event_hub_path, credential, **kwargs)[source]

The EventHubClient class defines a high level interface for asynchronously sending events to and receiving events from the Azure Event Hubs service.

Example

Create a new instance of the Event Hub client async.
from azure.eventhub.aio import EventHubClient
import os
connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
    os.environ['EVENT_HUB_HOSTNAME'],
    os.environ['EVENT_HUB_SAS_POLICY'],
    os.environ['EVENT_HUB_SAS_KEY'],
    os.environ['EVENT_HUB_NAME'])
client = EventHubClient.from_connection_string(connection_str)

Constructs a new EventHubClient.

Parameters
  • host (str) – The hostname of the Event Hub.

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.

  • http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.

  • receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

async close()[source]
create_consumer(consumer_group: str, partition_id: str, event_position: azure.eventhub.common.EventPosition, **kwargs) → azure.eventhub.aio.consumer_async.EventHubConsumer[source]

Create an async consumer to the client for a particular consumer group and partition.

Parameters
  • consumer_group (str) – The name of the consumer group this consumer is associated with. Events are read in the context of this group. The default consumer_group for an event hub is “$Default”.

  • partition_id (str) – The identifier of the Event Hub partition from which events will be received.

  • event_position (EventPosition) – The position within the partition where the consumer should begin reading events.

  • owner_level (int) – The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

  • loop – An event loop. If not specified the default event loop will be used.

Return type

EventHubConsumer

Example

Add an async consumer to the client for a particular consumer group and partition.
client = EventHubClient.from_connection_string(connection_str)
# Create an async consumer.
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'))
# Create an exclusive async consumer.
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'), owner_level=1)
create_producer(*, partition_id: str = None, send_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None) → azure.eventhub.aio.producer_async.EventHubProducer[source]

Create an async producer to send EventData object to an EventHub.

Parameters
  • partition_id (str) – Optionally specify a particular partition to send to. If omitted, the events will be distributed to available partitions via round-robin.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

  • loop – An event loop. If not specified the default event loop will be used.

Return type

EventHubProducer

Example

Add an async producer to the client to send EventData.
client = EventHubClient.from_connection_string(connection_str)
# Create an async producer.
producer = client.create_producer(partition_id="0")
classmethod from_connection_string(conn_str, **kwargs)

Create an EventHubClient from an EventHub connection string.

Parameters
  • conn_str (str) – The connection string of an eventhub

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.

  • receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds, meaning there is no timeout.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

Example

Create an EventHubClient from a connection string.
import os
from azure.eventhub import EventHubClient

connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
    os.environ['EVENT_HUB_HOSTNAME'],
    os.environ['EVENT_HUB_SAS_POLICY'],
    os.environ['EVENT_HUB_SAS_KEY'],
    os.environ['EVENT_HUB_NAME'])
client = EventHubClient.from_connection_string(connection_str)
async get_partition_ids()[source]

Get partition ids of the specified EventHub async.

Return type

list[str]

Raises

~azure.eventhub.ConnectError

async get_partition_properties(partition)[source]

Get properties of the specified partition async. Keys in the details dictionary include:

-‘event_hub_path’ -‘id’ -‘beginning_sequence_number’ -‘last_enqueued_sequence_number’ -‘last_enqueued_offset’ -‘last_enqueued_time_utc’ -‘is_empty’

Parameters

partition (str) – The target partition id.

Return type

dict

Raises

~azure.eventhub.EventHubError

async get_properties()[source]

Get properties of the specified EventHub async. Keys in the details dictionary include:

-‘path’ -‘created_at’ -‘partition_ids’

Return type

dict

Raises

~azure.eventhub.EventHubError

class azure.eventhub.aio.EventHubConsumer(client, source, **kwargs)[source]

A consumer responsible for reading EventData from a specific Event Hub partition and as a member of a specific consumer group.

A consumer may be exclusive, which asserts ownership over the partition for the consumer group to ensure that only one consumer from that group is reading the from the partition. These exclusive consumers are sometimes referred to as “Epoch Consumers.”

A consumer may also be non-exclusive, allowing multiple consumers from the same consumer group to be actively reading events from the partition. These non-exclusive consumers are sometimes referred to as “Non-Epoch Consumers.”

Please use the method create_consumer on EventHubClient for creating EventHubConsumer.

Instantiate an async consumer. EventHubConsumer should be instantiated by calling the create_consumer method in EventHubClient.

Parameters
  • client (EventHubClientAsync) – The parent EventHubClientAsync.

  • source (Source) – The source EventHub from which to receive events.

  • event_position (EventPosition) – The position from which to start receiving.

  • prefetch (int) – The number of events to prefetch from the service for processing. Default is 300.

  • owner_level (int) – The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set.

  • track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

  • loop – An event loop.

async close()[source]

Close down the handler. If the handler has already closed, this will be a no op.

Example

Close down the handler.
client = EventHubClient.from_connection_string(connection_str)
consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'))
try:
    # Open and receive
    await consumer.receive(timeout=1)
except:
    raise
finally:
    # Close down the receive handler.
    await consumer.close()
async receive(*, max_batch_size=None, timeout=None)[source]

Receive events asynchronously from the EventHub.

Parameters
  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. If combined with a timeout and no events are retrieve before the time, the result will be empty. If no batch size is supplied, the prefetch size will be the maximum.

  • timeout (float) – The maximum wait time to build up the requested message count for the batch. If not specified, the default wait time specified when the consumer was created will be used.

Return type

list[EventData]

Raises

~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventHubError

Example

Receives events asynchronously
logger = logging.getLogger("azure.eventhub")
async with consumer:
    received = await consumer.receive(timeout=5)
    for event_data in received:
        logger.info("Message received:{}".format(event_data.body_as_str()))
property last_enqueued_event_properties

The latest enqueued event information. This property will be updated each time an event is received when the receiver is created with track_last_enqueued_event_properties being True. The dict includes following information of the partition:

  • sequence_number

  • offset

  • enqueued_time

  • retrieval_time

Return type

dict or None

property queue_size

The current size of the unprocessed Event queue.

Return type

int

class azure.eventhub.aio.EventHubProducer(client, target, **kwargs)[source]

A producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the options specified at creation, the producer may be created to allow event data to be automatically routed to an available partition or specific to a partition.

Please use the method create_producer on EventHubClient for creating EventHubProducer.

Instantiate an async EventHubProducer. EventHubProducer should be instantiated by calling the create_producer method in EventHubClient.

Parameters
  • client (EventHubClientAsync) – The parent EventHubClientAsync.

  • target (str) – The URI of the EventHub to send to.

  • partition (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

  • keep_alive (float) – The time interval in seconds between pinging the connection to keep it alive during periods of inactivity. The default value is None, i.e. no keep alive pings.

  • auto_reconnect (bool) – Whether to automatically reconnect the producer if a retryable error occurs. Default value is True.

  • loop – An event loop. If not specified the default event loop will be used.

async close()[source]

Close down the handler. If the handler has already closed, this will be a no op.

Example

Close down the handler.
client = EventHubClient.from_connection_string(connection_str)
producer = client.create_producer(partition_id="0")
try:
    await producer.send(EventData(b"A single event"))
finally:
    # Close down the send handler.
    await producer.close()
async create_batch(max_size=None, partition_key=None)[source]

Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side.

Parameters
  • max_size (int) – The maximum size of bytes data that an EventDataBatch object can hold.

  • partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service.

Returns

an EventDataBatch instance

Return type

EventDataBatch

Example

Create EventDataBatch object within limited size
event_data_batch = await producer.create_batch(max_size=10000)
while True:
    try:
        event_data_batch.try_add(EventData('Message inside EventBatchData'))
    except ValueError:
        # The EventDataBatch object reaches its max_size.
        # You can send the full EventDataBatch object and create a new one here.
        break
async send(event_data: Union[azure.eventhub.common.EventData, azure.eventhub.common.EventDataBatch, Iterable[azure.eventhub.common.EventData]], *, partition_key: Union[str, bytes] = None, timeout: float = None)[source]

Sends an event data and blocks until acknowledgement is received or operation times out.

Parameters
  • event_data (EventData, Iterator, Generator, list) – The event to be sent. It can be an EventData object, or iterable of EventData objects

  • partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. partition_key could be omitted if event_data is of type ~azure.eventhub.EventDataBatch.

  • timeout (float) – The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used.

Raises

~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError

Returns

None

Return type

None

Example

Sends an event data and blocks until acknowledgement is received or operation times out.
async with producer:
    event_data = EventData(b"A single event")
    await producer.send(event_data)