azure.eventhub.aio.eventprocessor package¶
Submodules¶
azure.eventhub.aio.eventprocessor.event_processor module¶
-
class
azure.eventhub.aio.eventprocessor.event_processor.
EventProcessor
(eventhub_client: azure.eventhub.aio.client_async.EventHubClient, consumer_group_name: str, partition_processor_type: Type[azure.eventhub.aio.eventprocessor.partition_processor.PartitionProcessor], partition_manager: azure.eventhub.aio.eventprocessor.partition_manager.PartitionManager, *, initial_event_position: azure.eventhub.common.EventPosition = <azure.eventhub.common.EventPosition object>, polling_interval: float = 10.0)[source]¶ An EventProcessor constantly receives events from multiple partitions of the Event Hub in the context of a given consumer group. The received data will be sent to PartitionProcessor to be processed.
It provides the user a convenient way to receive events from multiple partitions and save checkpoints. If multiple EventProcessors are running for an event hub, they will automatically balance load.
Example
import asyncio import logging import os from azure.eventhub.aio import EventHubClient from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor from azure.eventhub.aio.eventprocessor import SamplePartitionManager RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. # Actual number of retries clould be less if RECEIVE_TIMEOUT is too small CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] logging.basicConfig(level=logging.INFO) async def do_operation(event): # do some sync or async operations. If the operation is i/o bound, async will have better performance print(event) class MyPartitionProcessor(PartitionProcessor): async def process_events(self, events, partition_context): if events: await asyncio.gather(*[do_operation(event) for event in events]) await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) async def main(): client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL) partition_manager = SamplePartitionManager(db_filename=":memory:") # a filename to persist checkpoint try: event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager, polling_interval=10) asyncio.create_task(event_processor.start()) await asyncio.sleep(60) await event_processor.stop() finally: await partition_manager.close() if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
Instantiate an EventProcessor.
- Parameters
eventhub_client (EventClient) – An instance of ~azure.eventhub.aio.EventClient object
consumer_group_name (str) – The name of the consumer group this event processor is associated with. Events will be read only in the context of this group.
partition_processor_type (type) – A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor.
partition_manager (Subclass of ~azure.eventhub.eventprocessor.PartitionManager.) –
Interacts with the data storage that stores ownership and checkpoints data. ~azure.eventhub.aio.eventprocessor.SamplePartitionManager demonstrates the basic usage of PartitionManager
which stores data in memory or a file.
Users can either use the provided PartitionManager plug-ins or develop their own PartitionManager.
initial_event_position – The event position to start a partition consumer.
if the partition has no checkpoint yet. This could be replaced by “reset” checkpoint in the near future. :type initial_event_position: EventPosition :param polling_interval: The interval between any two pollings of balancing and claiming :type polling_interval: float
-
async
start
()[source]¶ Start the EventProcessor.
- The EventProcessor will try to claim and balance partition ownership with other EventProcessor
and asynchronously start receiving EventData from EventHub and processing events.
- Returns
None
-
async
stop
()[source]¶ Stop the EventProcessor.
The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions it is working on. Other running EventProcessor will take over these released partitions.
A stopped EventProcessor can be restarted by calling method start again.
- Returns
None
azure.eventhub.aio.eventprocessor.partition_context module¶
-
class
azure.eventhub.aio.eventprocessor.partition_context.
PartitionContext
(eventhub_name: str, consumer_group_name: str, partition_id: str, owner_id: str, partition_manager: azure.eventhub.aio.eventprocessor.partition_manager.PartitionManager)[source]¶ Contains partition related context information for a PartitionProcessor instance to use.
Users can use update_checkpoint() of this class to save checkpoint data.
azure.eventhub.aio.eventprocessor.partition_manager module¶
-
exception
azure.eventhub.aio.eventprocessor.partition_manager.
OwnershipLostError
[source]¶ Raises when update_checkpoint detects the ownership to a partition has been lost
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
class
azure.eventhub.aio.eventprocessor.partition_manager.
PartitionManager
[source]¶ PartitionManager deals with the interaction with the chosen storage service. It’s able to list/claim ownership and save checkpoint.
-
abstract async
claim_ownership
(ownership_list: Iterable[Dict[str, Any]]) → Iterable[Dict[str, Any]][source]¶ Tries to claim a list of specified ownership.
- Parameters
ownership_list (Iterable of dict) – Iterable of dictionaries containing all the ownership to claim.
- Returns
Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag
-
abstract async
list_ownership
(eventhub_name: str, consumer_group_name: str) → Iterable[Dict[str, Any]][source]¶ Retrieves a complete ownership list from the chosen storage service.
- Parameters
- Returns
Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag
-
abstract async
update_checkpoint
(eventhub_name, consumer_group_name, partition_id, owner_id, offset, sequence_number) → None[source]¶ Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.
- Parameters
eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.
consumer_group_name (str) – The name of the consumer group the ownership are associated with.
partition_id (str) – The partition id which the checkpoint is created for.
owner_id (str) – The identifier of the ~azure.eventhub.eventprocessor.EventProcessor.
offset (str) – The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
sequence_number (int) – The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.
- Returns
None
- Raise
OwnershipLostError
-
abstract async
azure.eventhub.aio.eventprocessor.partition_processor module¶
-
class
azure.eventhub.aio.eventprocessor.partition_processor.
CloseReason
[source]¶ An enumeration.
-
EVENTHUB_EXCEPTION
= 2¶
-
OWNERSHIP_LOST
= 1¶
-
PROCESS_EVENTS_ERROR
= 3¶
-
SHUTDOWN
= 0¶
-
-
class
azure.eventhub.aio.eventprocessor.partition_processor.
PartitionProcessor
[source]¶ PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class implementing this abstract class will be created for every partition the associated ~azure.eventhub.aio.eventprocessor.EventProcessor owns.
-
async
close
(reason, partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]¶ Called when EventProcessor stops processing this PartitionProcessor.
There are different reasons to trigger the PartitionProcessor to close. Refer to enum class ~azure.eventhub.eventprocessor.CloseReason
- Parameters
reason (CloseReason) – Reason for closing the PartitionProcessor.
partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.
-
async
initialize
(partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]¶ This method will be called when EventProcessor creates a PartitionProcessor.
- Parameters
partition_context (PartitionContext) – The context information of this partition.
-
async
process_error
(error, partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]¶ Called when an error happens when receiving or processing events
- Parameters
error (Exception) – The error that happens.
partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.
-
abstract async
process_events
(events: List[azure.eventhub.common.EventData], partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]¶ Called when a batch of events have been received.
- Parameters
partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.
-
async
azure.eventhub.aio.eventprocessor.sample_partition_manager module¶
-
class
azure.eventhub.aio.eventprocessor.sample_partition_manager.
SamplePartitionManager
(db_filename: str = ':memory:', ownership_table: str = 'ownership')[source]¶ An implementation of PartitionManager by using the sqlite3 in Python standard library. Sqlite3 is a mini sql database that runs in memory or files. Please don’t use this PartitionManager for production use.
- Parameters
db_filename – name of file that saves the sql data. Sqlite3 will run in memory without a file when db_filename is “:memory:”.
ownership_table – The table name of the sqlite3 database.
-
async
claim_ownership
(ownership_list)[source]¶ Tries to claim a list of specified ownership.
- Parameters
ownership_list (Iterable of dict) – Iterable of dictionaries containing all the ownership to claim.
- Returns
Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag
-
async
list_ownership
(eventhub_name, consumer_group_name)[source]¶ Retrieves a complete ownership list from the chosen storage service.
- Parameters
- Returns
Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag
-
async
update_checkpoint
(eventhub_name, consumer_group_name, partition_id, owner_id, offset, sequence_number)[source]¶ Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.
- Parameters
eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.
consumer_group_name (str) – The name of the consumer group the ownership are associated with.
partition_id (str) – The partition id which the checkpoint is created for.
owner_id (str) – The identifier of the ~azure.eventhub.eventprocessor.EventProcessor.
offset (str) – The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
sequence_number (int) – The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.
- Returns
None
- Raise
OwnershipLostError
-
checkpoint_fields
= ['sequence_number', 'offset']¶
-
fields
= ['eventhub_name', 'consumer_group_name', 'partition_id', 'owner_id', 'owner_level', 'sequence_number', 'offset', 'last_modified_time', 'etag']¶
-
fields_dict
= {'consumer_group_name': 'text', 'etag': 'text', 'eventhub_name': 'text', 'last_modified_time': 'real', 'offset': 'text', 'owner_id': 'text', 'owner_level': 'integer', 'partition_id': 'text', 'sequence_number': 'integer'}¶
-
other_fields
= ['owner_id', 'owner_level', 'sequence_number', 'offset', 'last_modified_time', 'etag']¶
-
other_fields_dict
= {'etag': 'text', 'last_modified_time': 'real', 'offset': 'text', 'owner_id': 'text', 'owner_level': 'integer', 'sequence_number': 'integer'}¶
-
primary_keys
= ['eventhub_name', 'consumer_group_name', 'partition_id']¶
-
primary_keys_dict
= {'consumer_group_name': 'text', 'eventhub_name': 'text', 'partition_id': 'text'}¶
azure.eventhub.aio.eventprocessor.utils module¶
Module contents¶
-
exception
azure.eventhub.aio.eventprocessor.
OwnershipLostError
[source]¶ Raises when update_checkpoint detects the ownership to a partition has been lost
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
class
azure.eventhub.aio.eventprocessor.
CloseReason
[source]¶ An enumeration.
-
EVENTHUB_EXCEPTION
= 2¶
-
OWNERSHIP_LOST
= 1¶
-
PROCESS_EVENTS_ERROR
= 3¶
-
SHUTDOWN
= 0¶
-
-
class
azure.eventhub.aio.eventprocessor.
EventProcessor
(eventhub_client: azure.eventhub.aio.client_async.EventHubClient, consumer_group_name: str, partition_processor_type: Type[azure.eventhub.aio.eventprocessor.partition_processor.PartitionProcessor], partition_manager: azure.eventhub.aio.eventprocessor.partition_manager.PartitionManager, *, initial_event_position: azure.eventhub.common.EventPosition = <azure.eventhub.common.EventPosition object>, polling_interval: float = 10.0)[source]¶ An EventProcessor constantly receives events from multiple partitions of the Event Hub in the context of a given consumer group. The received data will be sent to PartitionProcessor to be processed.
It provides the user a convenient way to receive events from multiple partitions and save checkpoints. If multiple EventProcessors are running for an event hub, they will automatically balance load.
Example
import asyncio import logging import os from azure.eventhub.aio import EventHubClient from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor from azure.eventhub.aio.eventprocessor import SamplePartitionManager RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. # Actual number of retries clould be less if RECEIVE_TIMEOUT is too small CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] logging.basicConfig(level=logging.INFO) async def do_operation(event): # do some sync or async operations. If the operation is i/o bound, async will have better performance print(event) class MyPartitionProcessor(PartitionProcessor): async def process_events(self, events, partition_context): if events: await asyncio.gather(*[do_operation(event) for event in events]) await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) async def main(): client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL) partition_manager = SamplePartitionManager(db_filename=":memory:") # a filename to persist checkpoint try: event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager, polling_interval=10) asyncio.create_task(event_processor.start()) await asyncio.sleep(60) await event_processor.stop() finally: await partition_manager.close() if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
Instantiate an EventProcessor.
- Parameters
eventhub_client (EventClient) – An instance of ~azure.eventhub.aio.EventClient object
consumer_group_name (str) – The name of the consumer group this event processor is associated with. Events will be read only in the context of this group.
partition_processor_type (type) – A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor.
partition_manager (Subclass of ~azure.eventhub.eventprocessor.PartitionManager.) –
Interacts with the data storage that stores ownership and checkpoints data. ~azure.eventhub.aio.eventprocessor.SamplePartitionManager demonstrates the basic usage of PartitionManager
which stores data in memory or a file.
Users can either use the provided PartitionManager plug-ins or develop their own PartitionManager.
initial_event_position – The event position to start a partition consumer.
if the partition has no checkpoint yet. This could be replaced by “reset” checkpoint in the near future. :type initial_event_position: EventPosition :param polling_interval: The interval between any two pollings of balancing and claiming :type polling_interval: float
-
async
start
()[source]¶ Start the EventProcessor.
- The EventProcessor will try to claim and balance partition ownership with other EventProcessor
and asynchronously start receiving EventData from EventHub and processing events.
- Returns
None
-
async
stop
()[source]¶ Stop the EventProcessor.
The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions it is working on. Other running EventProcessor will take over these released partitions.
A stopped EventProcessor can be restarted by calling method start again.
- Returns
None
-
class
azure.eventhub.aio.eventprocessor.
PartitionProcessor
[source]¶ PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class implementing this abstract class will be created for every partition the associated ~azure.eventhub.aio.eventprocessor.EventProcessor owns.
-
async
close
(reason, partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]¶ Called when EventProcessor stops processing this PartitionProcessor.
There are different reasons to trigger the PartitionProcessor to close. Refer to enum class ~azure.eventhub.eventprocessor.CloseReason
- Parameters
reason (CloseReason) – Reason for closing the PartitionProcessor.
partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.
-
async
initialize
(partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]¶ This method will be called when EventProcessor creates a PartitionProcessor.
- Parameters
partition_context (PartitionContext) – The context information of this partition.
-
async
process_error
(error, partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]¶ Called when an error happens when receiving or processing events
- Parameters
error (Exception) – The error that happens.
partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.
-
abstract async
process_events
(events: List[azure.eventhub.common.EventData], partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]¶ Called when a batch of events have been received.
- Parameters
partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.
-
async
-
class
azure.eventhub.aio.eventprocessor.
PartitionManager
[source]¶ PartitionManager deals with the interaction with the chosen storage service. It’s able to list/claim ownership and save checkpoint.
-
abstract async
claim_ownership
(ownership_list: Iterable[Dict[str, Any]]) → Iterable[Dict[str, Any]][source]¶ Tries to claim a list of specified ownership.
- Parameters
ownership_list (Iterable of dict) – Iterable of dictionaries containing all the ownership to claim.
- Returns
Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag
-
abstract async
list_ownership
(eventhub_name: str, consumer_group_name: str) → Iterable[Dict[str, Any]][source]¶ Retrieves a complete ownership list from the chosen storage service.
- Parameters
- Returns
Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag
-
abstract async
update_checkpoint
(eventhub_name, consumer_group_name, partition_id, owner_id, offset, sequence_number) → None[source]¶ Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.
- Parameters
eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.
consumer_group_name (str) – The name of the consumer group the ownership are associated with.
partition_id (str) – The partition id which the checkpoint is created for.
owner_id (str) – The identifier of the ~azure.eventhub.eventprocessor.EventProcessor.
offset (str) – The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
sequence_number (int) – The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.
- Returns
None
- Raise
OwnershipLostError
-
abstract async
-
class
azure.eventhub.aio.eventprocessor.
PartitionContext
(eventhub_name: str, consumer_group_name: str, partition_id: str, owner_id: str, partition_manager: azure.eventhub.aio.eventprocessor.partition_manager.PartitionManager)[source]¶ Contains partition related context information for a PartitionProcessor instance to use.
Users can use update_checkpoint() of this class to save checkpoint data.
-
class
azure.eventhub.aio.eventprocessor.
SamplePartitionManager
(db_filename: str = ':memory:', ownership_table: str = 'ownership')[source]¶ An implementation of PartitionManager by using the sqlite3 in Python standard library. Sqlite3 is a mini sql database that runs in memory or files. Please don’t use this PartitionManager for production use.
- Parameters
db_filename – name of file that saves the sql data. Sqlite3 will run in memory without a file when db_filename is “:memory:”.
ownership_table – The table name of the sqlite3 database.
-
async
claim_ownership
(ownership_list)[source]¶ Tries to claim a list of specified ownership.
- Parameters
ownership_list (Iterable of dict) – Iterable of dictionaries containing all the ownership to claim.
- Returns
Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag
-
async
list_ownership
(eventhub_name, consumer_group_name)[source]¶ Retrieves a complete ownership list from the chosen storage service.
- Parameters
- Returns
Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag
-
async
update_checkpoint
(eventhub_name, consumer_group_name, partition_id, owner_id, offset, sequence_number)[source]¶ Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.
- Parameters
eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.
consumer_group_name (str) – The name of the consumer group the ownership are associated with.
partition_id (str) – The partition id which the checkpoint is created for.
owner_id (str) – The identifier of the ~azure.eventhub.eventprocessor.EventProcessor.
offset (str) – The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
sequence_number (int) – The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.
- Returns
None
- Raise
OwnershipLostError
-
checkpoint_fields
= ['sequence_number', 'offset']¶
-
fields
= ['eventhub_name', 'consumer_group_name', 'partition_id', 'owner_id', 'owner_level', 'sequence_number', 'offset', 'last_modified_time', 'etag']¶
-
fields_dict
= {'consumer_group_name': 'text', 'etag': 'text', 'eventhub_name': 'text', 'last_modified_time': 'real', 'offset': 'text', 'owner_id': 'text', 'owner_level': 'integer', 'partition_id': 'text', 'sequence_number': 'integer'}¶
-
other_fields
= ['owner_id', 'owner_level', 'sequence_number', 'offset', 'last_modified_time', 'etag']¶
-
other_fields_dict
= {'etag': 'text', 'last_modified_time': 'real', 'offset': 'text', 'owner_id': 'text', 'owner_level': 'integer', 'sequence_number': 'integer'}¶
-
primary_keys
= ['eventhub_name', 'consumer_group_name', 'partition_id']¶
-
primary_keys_dict
= {'consumer_group_name': 'text', 'eventhub_name': 'text', 'partition_id': 'text'}¶