azure.eventhub.aio package¶
-
class
azure.eventhub.aio.
EventHubConsumerClient
(host, event_hub_path, credential, **kwargs)[source]¶ The EventHubProducerClient class defines a high level interface for receiving events from the Azure Event Hubs service.
The main goal of EventHubConsumerClient is to receive events from all partitions of an EventHub with load balancing and checkpointing.
When multiple EventHubConsumerClient works with one process, multiple processes, or multiple computer machines and if they use the same repository as the load balancing and checkpointing store, they will balance automatically. To enable the load balancing and / or checkpointing, partition_manager must be set when creating the EventHubConsumerClient.
An EventHubConsumerClient can also receive from a specific partition when you call its method receive() and specify the partition_id. Load balancing won’t work in single-partition mode. But users can still save checkpoint if the partition_manager is set.
- Parameters
host (str) – The hostname of the Event Hub.
event_hub_path (str) – The path of the specific Event Hub to connect the client to.
credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts
EventHubSharedKeyCredential
,EventHubSASTokenCredential
, or credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.
- Keyword Arguments
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – The user agent that needs to be appended to the built in user agent string.
retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.
http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
partition_manager (PartitionManager) – stores the load balancing data and checkpoint data when receiving events if partition_manager is specified. If it’s None, this EventHubConsumerClient instance will receive events without load balancing and checkpoint.
load_balancing_interval (float) – When load balancing kicks in, this is the interval in seconds between two load balancing. Default is 10.
Example:
import os from azure.eventhub import EventHubSharedKeyCredential from azure.eventhub.aio import EventHubConsumerClient hostname = os.environ['EVENT_HUB_HOSTNAME'] event_hub = os.environ['EVENT_HUB_NAME'] shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY'] shared_access_key = os.environ['EVENT_HUB_SAS_KEY'] consumer = EventHubConsumerClient(host=hostname, event_hub_path=event_hub, credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
-
async
close
()[source]¶ Stop retrieving events from event hubs and close the underlying AMQP connection and links.
- Return type
Example:
import os event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR'] event_hub = os.environ['EVENT_HUB_NAME'] from azure.eventhub.aio import EventHubConsumerClient consumer = EventHubConsumerClient.from_connection_string( conn_str=event_hub_connection_str, event_hub_path=event_hub ) logger = logging.getLogger("azure.eventhub") async def on_events(partition_context, events): logger.info("Received {} messages from partition: {}".format( len(events), partition_context.partition_id)) # Do ops on received events # The receive method is a coroutine method which can be called by `await consumer.receive(...)` and it will block. # so execute it in an async task to better demonstrate how to stop the receiving by calling he close method. recv_task = asyncio.ensure_future(consumer.receive(on_events=on_events, consumer_group='$Default')) await asyncio.sleep(3) # keep receiving for 3 seconds recv_task.cancel() # stop receiving # Close down the consumer handler explicitly. await consumer.close()
-
classmethod
from_connection_string
(conn_str, **kwargs)[source]¶ Create an EventHubConsumerClient from a connection string.
- Parameters
conn_str (str) – The connection string of an eventhub.
- Keyword Arguments
event_hub_path (str) – The path of the specific Event Hub to connect the client to.
network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.
http_proxy (dict[str,Any]) – HTTP proxy settings. This must be a dictionary with the following keys - ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present - ‘username’, ‘password’.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – The user agent that needs to be appended to the built in user agent string.
retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.
partition_manager (PartitionManager) – stores the load balancing data and checkpoint data when receiving events if partition_manager is specified. If it’s None, this EventHubConsumerClient instance will receive events without load balancing and checkpoint.
load_balancing_interval (float) – When load balancing kicks in, this is the interval in seconds between two load balancing. Default is 10.
Example:
import os from azure.eventhub.aio import EventHubConsumerClient event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR'] event_hub = os.environ['EVENT_HUB_NAME'] consumer = EventHubConsumerClient.from_connection_string(conn_str=event_hub_connection_str, event_hub_path=event_hub)
-
async
get_partition_ids
()¶ Get partition ids of the specified EventHub async.
- Return type
- Raises
-
async
get_partition_properties
(partition)¶ Get properties of the specified partition async. Keys in the details dictionary include:
event_hub_path
id
beginning_sequence_number
last_enqueued_sequence_number
last_enqueued_offset
last_enqueued_time_utc
is_empty
- Parameters
partition (str) – The target partition id.
- Return type
- Raises
-
async
get_properties
()¶ Get properties of the specified EventHub async. Keys in the details dictionary include:
path
created_at
partition_ids
- Return type
- Raises
-
async
receive
(on_events, consumer_group: str, *, partition_id: str = None, owner_level: int = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, initial_event_position=None, on_error=None, on_partition_initialize=None, on_partition_close=None) → None[source]¶ Receive events from partition(s) optionally with load balancing and checkpointing.
- Parameters
on_events (Callable[PartitionContext, List[EventData]]) – The callback function for handling received events. The callback takes two parameters: partition_context which contains partition context and events which are the received events. Please define the callback like on_event(partition_context, events). For detailed partition context information, please refer to
PartitionContext
.consumer_group (str) – Receive events from the event hub for this consumer group
- Keyword Arguments
partition_id (str) – Receive from this partition only if it’s not None. Receive from all partition otherwise.
owner_level (int) – The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set. Higher owner_level has higher exclusive priority.
prefetch (int) – The number of events to prefetch from the service for processing. Default is 300.
track_last_enqueued_event_properties (bool) – Indicates whether the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.
initial_event_position (EventPosition or dict[str,EventPosition]) – Start receiving from this initial_event_position if there isn’t checkpoint data for a partition. Use the checkpoint data if there it’s available. This can be a a dict with partition id as the key and position as the value for individual partitions, or a single EventPosition instance for all partitions.
on_error (Callable[[PartitionContext, Exception]]) – The callback function which would be called when there is an error met during the receiving time. The callback takes two parameters: partition_context which contains partition information and error being the exception. Please define the callback like on_error(partition_context, error).
on_partition_initialize (Callable[[PartitionContext]]) – The callback function which will be called after a consumer for certain partition finishes initialization. The callback takes two parameter: partition_context which contains the partition information. Please define the callback like on_partition_initialize(partition_context).
on_partition_close (Callable[[PartitionContext, CloseReason]]) – The callback function which will be called after a consumer for certain partition is closed. The callback takes two parameters: partition_context which contains partition information and reason for the close. Please define the callback like on_partition_close(partition_context, reason). Please refer to
CloseReason
for different closing reason.
- Return type
Example:
logger = logging.getLogger("azure.eventhub") async def on_events(partition_context, events): logger.info("Received {} messages from partition: {}".format( len(events), partition_context.partition_id)) # Do ops on received events async with consumer: await consumer.receive(on_events=on_events, consumer_group="$default")
-
class
azure.eventhub.aio.
EventHubProducerClient
(host, event_hub_path, credential, **kwargs)[source]¶ The EventHubProducerClient class defines a high level interface for sending events to the Azure Event Hubs service.
- Parameters
host (str) – The hostname of the Event Hub.
event_hub_path (str) – The path of the specific Event Hub to connect the client to.
credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts
EventHubSharedKeyCredential
,EventHubSASTokenCredential
, or credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.
- Keyword Arguments
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – The user agent that needs to be appended to the built in user agent string.
retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.
http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
Example:
import os from azure.eventhub import EventHubSharedKeyCredential from azure.eventhub.aio import EventHubProducerClient hostname = os.environ['EVENT_HUB_HOSTNAME'] event_hub = os.environ['EVENT_HUB_NAME'] shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY'] shared_access_key = os.environ['EVENT_HUB_SAS_KEY'] producer = EventHubProducerClient(host=hostname, event_hub_path=event_hub, credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
-
async
close
()[source]¶ Close down the handler. If the handler has already closed, this will be a no op.
- Return type
Example:
import os from azure.eventhub.aio import EventHubProducerClient from azure.eventhub import EventData event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR'] event_hub = os.environ['EVENT_HUB_NAME'] producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_str, event_hub_path=event_hub) try: await producer.send(EventData(b"A single event")) finally: # Close down the producer handler. await producer.close()
-
async
create_batch
(max_size=None)[source]¶ Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side.
- Parameters
max_size (int) – The maximum size of bytes data that an EventDataBatch object can hold.
- Return type
Example:
from azure.eventhub import EventData event_data_batch = await producer.create_batch(max_size=10000) while True: try: event_data_batch.try_add(EventData('Message inside EventBatchData')) except ValueError: # The EventDataBatch object reaches its max_size. # You can send the full EventDataBatch object and create a new one here. break
-
classmethod
from_connection_string
(conn_str, **kwargs)[source]¶ Create an EventHubProducerClient from a connection string.
- Parameters
conn_str (str) – The connection string of an eventhub.
- Keyword Arguments
event_hub_path (str) – The path of the specific Event Hub to connect the client to.
network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.
http_proxy (dict[str,Any]) – HTTP proxy settings. This must be a dictionary with the following keys - ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present - ‘username’, ‘password’.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – The user agent that needs to be appended to the built in user agent string.
retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.
Example:
import os from azure.eventhub.aio import EventHubProducerClient event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR'] event_hub = os.environ['EVENT_HUB_NAME'] producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_str, event_hub_path=event_hub)
-
async
get_partition_ids
()¶ Get partition ids of the specified EventHub async.
- Return type
- Raises
-
async
get_partition_properties
(partition)¶ Get properties of the specified partition async. Keys in the details dictionary include:
event_hub_path
id
beginning_sequence_number
last_enqueued_sequence_number
last_enqueued_offset
last_enqueued_time_utc
is_empty
- Parameters
partition (str) – The target partition id.
- Return type
- Raises
-
async
get_properties
()¶ Get properties of the specified EventHub async. Keys in the details dictionary include:
path
created_at
partition_ids
- Return type
- Raises
-
async
send
(event_data, *, partition_key: Union[str, bytes] = None, partition_id: str = None, timeout: float = None) → None[source]¶ Sends event data and blocks until acknowledgement is received or operation times out.
- Parameters
event_data (EventData or EventDataBatch or Iterator[EventData]) – The event to be sent. It can be an EventData object, or iterable of EventData objects.
- Keyword Arguments
partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service.
partition_id (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin.
timeout (float) – The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used.
- Return type
- Raises
AuthenticationError
ConnectError
ConnectionLostError
EventDataError
EventDataSendError
EventHubError
Example:
async with producer: event_data = EventData(b"A single event") await producer.send(event_data)
-
class
azure.eventhub.aio.
PartitionManager
[source]¶ PartitionManager deals with the interaction with the chosen storage service. It’s able to list/claim ownership and save checkpoint.
-
abstract async
claim_ownership
(ownership_list: Iterable[Dict[str, Any]]) → Iterable[Dict[str, Any]][source]¶ Tries to claim a list of specified ownership.
- Parameters
ownership_list (Iterable[Dict[str,Any]]) – Iterable of dictionaries containing all the ownership to claim.
- Return type
Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information:
fully_qualified_namespace
eventhub_name
consumer_group_name
owner_id
partition_id
last_modified_time
etag
-
abstract async
list_checkpoints
(fully_qualified_namespace: str, eventhub_name: str, consumer_group_name: str)[source]¶ List the updated checkpoints from the store
- Parameters
fully_qualified_namespace (str) – The fully qualified namespace that the event hub belongs to. The format is like “<namespace>.servicebus.windows.net”
eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.
consumer_group_name (str) – The name of the consumer group the ownership are associated with.
- Return type
Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information:
fully_qualified_namespace
eventhub_name
consumer_group_name
partition_id
sequence_number
offset
-
abstract async
list_ownership
(fully_qualified_namespace: str, eventhub_name: str, consumer_group_name: str) → Iterable[Dict[str, Any]][source]¶ Retrieves a complete ownership list from the chosen storage service.
- Parameters
fully_qualified_namespace (str) – The fully qualified namespace that the event hub belongs to. The format is like “<namespace>.servicebus.windows.net”
eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.
consumer_group_name (str) – The name of the consumer group the ownership are associated with.
- Return type
Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information:
fully_qualified_namespace
eventhub_name
consumer_group_name
owner_id
partition_id
last_modified_time
etag
-
abstract async
update_checkpoint
(fully_qualified_namespace: str, eventhub_name: str, consumer_group_name: str, partition_id: str, offset: str, sequence_number: int) → None[source]¶ Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.
- Parameters
fully_qualified_namespace (str) – The fully qualified namespace that the event hub belongs to. The format is like “<namespace>.servicebus.windows.net”
eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.
consumer_group_name (str) – The name of the consumer group the ownership are associated with.
partition_id (str) – The partition id which the checkpoint is created for.
offset (str) – The offset of the
EventData
the new checkpoint will be associated with.sequence_number (int) – The sequence_number of the
EventData
the new checkpoint will be associated with.
- Return type
-
abstract async
-
class
azure.eventhub.aio.
PartitionContext
(fully_qualified_namespace: str, eventhub_name: str, consumer_group_name: str, partition_id: str, owner_id: str, partition_manager: azure.eventhub.aio.eventprocessor.partition_manager.PartitionManager = None)[source]¶ Contains partition related context information for a PartitionProcessor instance to use.
Users can use update_checkpoint() of this class to save checkpoint data.