azure.eventhub.aio.eventprocessor package

Submodules

azure.eventhub.aio.eventprocessor.event_processor module

class azure.eventhub.aio.eventprocessor.event_processor.EventProcessor(eventhub_client: azure.eventhub.aio.client_async.EventHubClient, consumer_group_name: str, partition_processor_type: Type[azure.eventhub.aio.eventprocessor.partition_processor.PartitionProcessor], partition_manager: azure.eventhub.aio.eventprocessor.partition_manager.PartitionManager, *, initial_event_position: azure.eventhub.common.EventPosition = <azure.eventhub.common.EventPosition object>, polling_interval: float = 10.0)[source]

An EventProcessor constantly receives events from multiple partitions of the Event Hub in the context of a given consumer group. The received data will be sent to PartitionProcessor to be processed.

It provides the user a convenient way to receive events from multiple partitions and save checkpoints. If multiple EventProcessors are running for an event hub, they will automatically balance load.

Example

import asyncio
import logging
import os
from azure.eventhub.aio import EventHubClient
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor
from azure.eventhub.aio.eventprocessor import SamplePartitionManager

RECEIVE_TIMEOUT = 5  # timeout in seconds for a receiving operation. 0 or None means no timeout
RETRY_TOTAL = 3  # max number of retries for receive operations within the receive timeout.
                 # Actual number of retries clould be less if RECEIVE_TIMEOUT is too small
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]

logging.basicConfig(level=logging.INFO)

async def do_operation(event):
    # do some sync or async operations. If the operation is i/o bound, async will have better performance
    print(event)


class MyPartitionProcessor(PartitionProcessor):
    async def process_events(self, events, partition_context):
        if events:
            await asyncio.gather(*[do_operation(event) for event in events])
            await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number)

async def main():
    client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT,
                                                   retry_total=RETRY_TOTAL)
    partition_manager = SamplePartitionManager(db_filename=":memory:")  # a filename to persist checkpoint
    try:
        event_processor = EventProcessor(client, "$default", MyPartitionProcessor,
                                         partition_manager, polling_interval=10)
        asyncio.create_task(event_processor.start())
        await asyncio.sleep(60)
        await event_processor.stop()
    finally:
        await partition_manager.close()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

Instantiate an EventProcessor.

Parameters
  • eventhub_client (EventClient) – An instance of ~azure.eventhub.aio.EventClient object

  • consumer_group_name (str) – The name of the consumer group this event processor is associated with. Events will be read only in the context of this group.

  • partition_processor_type (type) – A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor.

  • partition_manager (Subclass of ~azure.eventhub.eventprocessor.PartitionManager.) –

    Interacts with the data storage that stores ownership and checkpoints data. ~azure.eventhub.aio.eventprocessor.SamplePartitionManager demonstrates the basic usage of PartitionManager

    which stores data in memory or a file.

    Users can either use the provided PartitionManager plug-ins or develop their own PartitionManager.

  • initial_event_position – The event position to start a partition consumer.

if the partition has no checkpoint yet. This could be replaced by “reset” checkpoint in the near future. :type initial_event_position: EventPosition :param polling_interval: The interval between any two pollings of balancing and claiming :type polling_interval: float

async start()[source]

Start the EventProcessor.

The EventProcessor will try to claim and balance partition ownership with other EventProcessor

and asynchronously start receiving EventData from EventHub and processing events.

Returns

None

async stop()[source]

Stop the EventProcessor.

The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions it is working on. Other running EventProcessor will take over these released partitions.

A stopped EventProcessor can be restarted by calling method start again.

Returns

None

azure.eventhub.aio.eventprocessor.partition_context module

class azure.eventhub.aio.eventprocessor.partition_context.PartitionContext(eventhub_name: str, consumer_group_name: str, partition_id: str, owner_id: str, partition_manager: azure.eventhub.aio.eventprocessor.partition_manager.PartitionManager)[source]

Contains partition related context information for a PartitionProcessor instance to use.

Users can use update_checkpoint() of this class to save checkpoint data.

async update_checkpoint(offset, sequence_number=None)[source]

Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.

Parameters
  • offset (str) – The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.

  • sequence_number (int) – The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.

Returns

None

azure.eventhub.aio.eventprocessor.partition_manager module

exception azure.eventhub.aio.eventprocessor.partition_manager.OwnershipLostError[source]

Raises when update_checkpoint detects the ownership to a partition has been lost

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
class azure.eventhub.aio.eventprocessor.partition_manager.PartitionManager[source]

PartitionManager deals with the interaction with the chosen storage service. It’s able to list/claim ownership and save checkpoint.

abstract async claim_ownership(ownership_list: Iterable[Dict[str, Any]]) → Iterable[Dict[str, Any]][source]

Tries to claim a list of specified ownership.

Parameters

ownership_list (Iterable of dict) – Iterable of dictionaries containing all the ownership to claim.

Returns

Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag

abstract async list_ownership(eventhub_name: str, consumer_group_name: str) → Iterable[Dict[str, Any]][source]

Retrieves a complete ownership list from the chosen storage service.

Parameters
  • eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group_name (str) – The name of the consumer group the ownership are associated with.

Returns

Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag

abstract async update_checkpoint(eventhub_name, consumer_group_name, partition_id, owner_id, offset, sequence_number) → None[source]

Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.

Parameters
  • eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group_name (str) – The name of the consumer group the ownership are associated with.

  • partition_id (str) – The partition id which the checkpoint is created for.

  • owner_id (str) – The identifier of the ~azure.eventhub.eventprocessor.EventProcessor.

  • offset (str) – The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.

  • sequence_number (int) – The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.

Returns

None

Raise

OwnershipLostError

azure.eventhub.aio.eventprocessor.partition_processor module

class azure.eventhub.aio.eventprocessor.partition_processor.CloseReason[source]

An enumeration.

EVENTHUB_EXCEPTION = 2
OWNERSHIP_LOST = 1
PROCESS_EVENTS_ERROR = 3
SHUTDOWN = 0
class azure.eventhub.aio.eventprocessor.partition_processor.PartitionProcessor[source]

PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class implementing this abstract class will be created for every partition the associated ~azure.eventhub.aio.eventprocessor.EventProcessor owns.

async close(reason, partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]

Called when EventProcessor stops processing this PartitionProcessor.

There are different reasons to trigger the PartitionProcessor to close. Refer to enum class ~azure.eventhub.eventprocessor.CloseReason

Parameters
  • reason (CloseReason) – Reason for closing the PartitionProcessor.

  • partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.

async initialize(partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]

This method will be called when EventProcessor creates a PartitionProcessor.

Parameters

partition_context (PartitionContext) – The context information of this partition.

async process_error(error, partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]

Called when an error happens when receiving or processing events

Parameters
  • error (Exception) – The error that happens.

  • partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.

abstract async process_events(events: List[azure.eventhub.common.EventData], partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]

Called when a batch of events have been received.

Parameters
  • events (list[EventData]) – Received events.

  • partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.

azure.eventhub.aio.eventprocessor.sample_partition_manager module

class azure.eventhub.aio.eventprocessor.sample_partition_manager.SamplePartitionManager(db_filename: str = ':memory:', ownership_table: str = 'ownership')[source]

An implementation of PartitionManager by using the sqlite3 in Python standard library. Sqlite3 is a mini sql database that runs in memory or files. Please don’t use this PartitionManager for production use.

Parameters
  • db_filename – name of file that saves the sql data. Sqlite3 will run in memory without a file when db_filename is “:memory:”.

  • ownership_table – The table name of the sqlite3 database.

async claim_ownership(ownership_list)[source]

Tries to claim a list of specified ownership.

Parameters

ownership_list (Iterable of dict) – Iterable of dictionaries containing all the ownership to claim.

Returns

Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag

async close()[source]
async list_ownership(eventhub_name, consumer_group_name)[source]

Retrieves a complete ownership list from the chosen storage service.

Parameters
  • eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group_name (str) – The name of the consumer group the ownership are associated with.

Returns

Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag

async update_checkpoint(eventhub_name, consumer_group_name, partition_id, owner_id, offset, sequence_number)[source]

Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.

Parameters
  • eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group_name (str) – The name of the consumer group the ownership are associated with.

  • partition_id (str) – The partition id which the checkpoint is created for.

  • owner_id (str) – The identifier of the ~azure.eventhub.eventprocessor.EventProcessor.

  • offset (str) – The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.

  • sequence_number (int) – The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.

Returns

None

Raise

OwnershipLostError

checkpoint_fields = ['sequence_number', 'offset']
fields = ['eventhub_name', 'consumer_group_name', 'partition_id', 'owner_id', 'owner_level', 'sequence_number', 'offset', 'last_modified_time', 'etag']
fields_dict = {'consumer_group_name': 'text', 'etag': 'text', 'eventhub_name': 'text', 'last_modified_time': 'real', 'offset': 'text', 'owner_id': 'text', 'owner_level': 'integer', 'partition_id': 'text', 'sequence_number': 'integer'}
other_fields = ['owner_id', 'owner_level', 'sequence_number', 'offset', 'last_modified_time', 'etag']
other_fields_dict = {'etag': 'text', 'last_modified_time': 'real', 'offset': 'text', 'owner_id': 'text', 'owner_level': 'integer', 'sequence_number': 'integer'}
primary_keys = ['eventhub_name', 'consumer_group_name', 'partition_id']
primary_keys_dict = {'consumer_group_name': 'text', 'eventhub_name': 'text', 'partition_id': 'text'}

azure.eventhub.aio.eventprocessor.utils module

azure.eventhub.aio.eventprocessor.utils.get_running_loop()[source]

Module contents

exception azure.eventhub.aio.eventprocessor.OwnershipLostError[source]

Raises when update_checkpoint detects the ownership to a partition has been lost

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
class azure.eventhub.aio.eventprocessor.CloseReason[source]

An enumeration.

EVENTHUB_EXCEPTION = 2
OWNERSHIP_LOST = 1
PROCESS_EVENTS_ERROR = 3
SHUTDOWN = 0
class azure.eventhub.aio.eventprocessor.EventProcessor(eventhub_client: azure.eventhub.aio.client_async.EventHubClient, consumer_group_name: str, partition_processor_type: Type[azure.eventhub.aio.eventprocessor.partition_processor.PartitionProcessor], partition_manager: azure.eventhub.aio.eventprocessor.partition_manager.PartitionManager, *, initial_event_position: azure.eventhub.common.EventPosition = <azure.eventhub.common.EventPosition object>, polling_interval: float = 10.0)[source]

An EventProcessor constantly receives events from multiple partitions of the Event Hub in the context of a given consumer group. The received data will be sent to PartitionProcessor to be processed.

It provides the user a convenient way to receive events from multiple partitions and save checkpoints. If multiple EventProcessors are running for an event hub, they will automatically balance load.

Example

import asyncio
import logging
import os
from azure.eventhub.aio import EventHubClient
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor
from azure.eventhub.aio.eventprocessor import SamplePartitionManager

RECEIVE_TIMEOUT = 5  # timeout in seconds for a receiving operation. 0 or None means no timeout
RETRY_TOTAL = 3  # max number of retries for receive operations within the receive timeout.
                 # Actual number of retries clould be less if RECEIVE_TIMEOUT is too small
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]

logging.basicConfig(level=logging.INFO)

async def do_operation(event):
    # do some sync or async operations. If the operation is i/o bound, async will have better performance
    print(event)


class MyPartitionProcessor(PartitionProcessor):
    async def process_events(self, events, partition_context):
        if events:
            await asyncio.gather(*[do_operation(event) for event in events])
            await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number)

async def main():
    client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT,
                                                   retry_total=RETRY_TOTAL)
    partition_manager = SamplePartitionManager(db_filename=":memory:")  # a filename to persist checkpoint
    try:
        event_processor = EventProcessor(client, "$default", MyPartitionProcessor,
                                         partition_manager, polling_interval=10)
        asyncio.create_task(event_processor.start())
        await asyncio.sleep(60)
        await event_processor.stop()
    finally:
        await partition_manager.close()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

Instantiate an EventProcessor.

Parameters
  • eventhub_client (EventClient) – An instance of ~azure.eventhub.aio.EventClient object

  • consumer_group_name (str) – The name of the consumer group this event processor is associated with. Events will be read only in the context of this group.

  • partition_processor_type (type) – A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor.

  • partition_manager (Subclass of ~azure.eventhub.eventprocessor.PartitionManager.) –

    Interacts with the data storage that stores ownership and checkpoints data. ~azure.eventhub.aio.eventprocessor.SamplePartitionManager demonstrates the basic usage of PartitionManager

    which stores data in memory or a file.

    Users can either use the provided PartitionManager plug-ins or develop their own PartitionManager.

  • initial_event_position – The event position to start a partition consumer.

if the partition has no checkpoint yet. This could be replaced by “reset” checkpoint in the near future. :type initial_event_position: EventPosition :param polling_interval: The interval between any two pollings of balancing and claiming :type polling_interval: float

async start()[source]

Start the EventProcessor.

The EventProcessor will try to claim and balance partition ownership with other EventProcessor

and asynchronously start receiving EventData from EventHub and processing events.

Returns

None

async stop()[source]

Stop the EventProcessor.

The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions it is working on. Other running EventProcessor will take over these released partitions.

A stopped EventProcessor can be restarted by calling method start again.

Returns

None

class azure.eventhub.aio.eventprocessor.PartitionProcessor[source]

PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class implementing this abstract class will be created for every partition the associated ~azure.eventhub.aio.eventprocessor.EventProcessor owns.

async close(reason, partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]

Called when EventProcessor stops processing this PartitionProcessor.

There are different reasons to trigger the PartitionProcessor to close. Refer to enum class ~azure.eventhub.eventprocessor.CloseReason

Parameters
  • reason (CloseReason) – Reason for closing the PartitionProcessor.

  • partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.

async initialize(partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]

This method will be called when EventProcessor creates a PartitionProcessor.

Parameters

partition_context (PartitionContext) – The context information of this partition.

async process_error(error, partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]

Called when an error happens when receiving or processing events

Parameters
  • error (Exception) – The error that happens.

  • partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.

abstract async process_events(events: List[azure.eventhub.common.EventData], partition_context: azure.eventhub.aio.eventprocessor.partition_context.PartitionContext)[source]

Called when a batch of events have been received.

Parameters
  • events (list[EventData]) – Received events.

  • partition_context (PartitionContext) – The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store.

class azure.eventhub.aio.eventprocessor.PartitionManager[source]

PartitionManager deals with the interaction with the chosen storage service. It’s able to list/claim ownership and save checkpoint.

abstract async claim_ownership(ownership_list: Iterable[Dict[str, Any]]) → Iterable[Dict[str, Any]][source]

Tries to claim a list of specified ownership.

Parameters

ownership_list (Iterable of dict) – Iterable of dictionaries containing all the ownership to claim.

Returns

Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag

abstract async list_ownership(eventhub_name: str, consumer_group_name: str) → Iterable[Dict[str, Any]][source]

Retrieves a complete ownership list from the chosen storage service.

Parameters
  • eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group_name (str) – The name of the consumer group the ownership are associated with.

Returns

Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag

abstract async update_checkpoint(eventhub_name, consumer_group_name, partition_id, owner_id, offset, sequence_number) → None[source]

Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.

Parameters
  • eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group_name (str) – The name of the consumer group the ownership are associated with.

  • partition_id (str) – The partition id which the checkpoint is created for.

  • owner_id (str) – The identifier of the ~azure.eventhub.eventprocessor.EventProcessor.

  • offset (str) – The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.

  • sequence_number (int) – The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.

Returns

None

Raise

OwnershipLostError

class azure.eventhub.aio.eventprocessor.PartitionContext(eventhub_name: str, consumer_group_name: str, partition_id: str, owner_id: str, partition_manager: azure.eventhub.aio.eventprocessor.partition_manager.PartitionManager)[source]

Contains partition related context information for a PartitionProcessor instance to use.

Users can use update_checkpoint() of this class to save checkpoint data.

async update_checkpoint(offset, sequence_number=None)[source]

Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.

Parameters
  • offset (str) – The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.

  • sequence_number (int) – The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.

Returns

None

class azure.eventhub.aio.eventprocessor.SamplePartitionManager(db_filename: str = ':memory:', ownership_table: str = 'ownership')[source]

An implementation of PartitionManager by using the sqlite3 in Python standard library. Sqlite3 is a mini sql database that runs in memory or files. Please don’t use this PartitionManager for production use.

Parameters
  • db_filename – name of file that saves the sql data. Sqlite3 will run in memory without a file when db_filename is “:memory:”.

  • ownership_table – The table name of the sqlite3 database.

async claim_ownership(ownership_list)[source]

Tries to claim a list of specified ownership.

Parameters

ownership_list (Iterable of dict) – Iterable of dictionaries containing all the ownership to claim.

Returns

Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag

async close()[source]
async list_ownership(eventhub_name, consumer_group_name)[source]

Retrieves a complete ownership list from the chosen storage service.

Parameters
  • eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group_name (str) – The name of the consumer group the ownership are associated with.

Returns

Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name owner_id partition_id owner_level offset sequence_number last_modified_time etag

async update_checkpoint(eventhub_name, consumer_group_name, partition_id, owner_id, offset, sequence_number)[source]

Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.

Parameters
  • eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group_name (str) – The name of the consumer group the ownership are associated with.

  • partition_id (str) – The partition id which the checkpoint is created for.

  • owner_id (str) – The identifier of the ~azure.eventhub.eventprocessor.EventProcessor.

  • offset (str) – The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.

  • sequence_number (int) – The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.

Returns

None

Raise

OwnershipLostError

checkpoint_fields = ['sequence_number', 'offset']
fields = ['eventhub_name', 'consumer_group_name', 'partition_id', 'owner_id', 'owner_level', 'sequence_number', 'offset', 'last_modified_time', 'etag']
fields_dict = {'consumer_group_name': 'text', 'etag': 'text', 'eventhub_name': 'text', 'last_modified_time': 'real', 'offset': 'text', 'owner_id': 'text', 'owner_level': 'integer', 'partition_id': 'text', 'sequence_number': 'integer'}
other_fields = ['owner_id', 'owner_level', 'sequence_number', 'offset', 'last_modified_time', 'etag']
other_fields_dict = {'etag': 'text', 'last_modified_time': 'real', 'offset': 'text', 'owner_id': 'text', 'owner_level': 'integer', 'sequence_number': 'integer'}
primary_keys = ['eventhub_name', 'consumer_group_name', 'partition_id']
primary_keys_dict = {'consumer_group_name': 'text', 'eventhub_name': 'text', 'partition_id': 'text'}