# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------
from contextlib import contextmanager
from typing import Dict, Type
import uuid
import asyncio
import logging
from azure.core.tracing import SpanKind # type: ignore
from azure.core.settings import settings # type: ignore
from azure.eventhub import EventPosition, EventHubError
from azure.eventhub.aio import EventHubClient
from .partition_context import PartitionContext
from .partition_manager import PartitionManager, OwnershipLostError
from ._ownership_manager import OwnershipManager
from .partition_processor import CloseReason, PartitionProcessor
from .utils import get_running_loop
log = logging.getLogger(__name__)
OWNER_LEVEL = 0
[docs]class EventProcessor(object): # pylint:disable=too-many-instance-attributes
"""
An EventProcessor constantly receives events from multiple partitions of the Event Hub in the context of a given
consumer group. The received data will be sent to PartitionProcessor to be processed.
It provides the user a convenient way to receive events from multiple partitions and save checkpoints.
If multiple EventProcessors are running for an event hub, they will automatically balance load.
Example:
.. code-block:: python
import asyncio
import logging
import os
from azure.eventhub.aio import EventHubClient
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor
from azure.eventhub.aio.eventprocessor import SamplePartitionManager
RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout
RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout.
# Actual number of retries clould be less if RECEIVE_TIMEOUT is too small
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
logging.basicConfig(level=logging.INFO)
async def do_operation(event):
# do some sync or async operations. If the operation is i/o bound, async will have better performance
print(event)
class MyPartitionProcessor(PartitionProcessor):
async def process_events(self, events, partition_context):
if events:
await asyncio.gather(*[do_operation(event) for event in events])
await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number)
async def main():
client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT,
retry_total=RETRY_TOTAL)
partition_manager = SamplePartitionManager(db_filename=":memory:") # a filename to persist checkpoint
try:
event_processor = EventProcessor(client, "$default", MyPartitionProcessor,
partition_manager, polling_interval=10)
asyncio.create_task(event_processor.start())
await asyncio.sleep(60)
await event_processor.stop()
finally:
await partition_manager.close()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
"""
def __init__(
self, eventhub_client: EventHubClient, consumer_group_name: str,
partition_processor_type: Type[PartitionProcessor],
partition_manager: PartitionManager, *,
initial_event_position: EventPosition = EventPosition("-1"), polling_interval: float = 10.0
):
"""
Instantiate an EventProcessor.
:param eventhub_client: An instance of ~azure.eventhub.aio.EventClient object
:type eventhub_client: ~azure.eventhub.aio.EventClient
:param consumer_group_name: The name of the consumer group this event processor is associated with. Events will
be read only in the context of this group.
:type consumer_group_name: str
:param partition_processor_type: A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor.
:type partition_processor_type: type
:param partition_manager: Interacts with the data storage that stores ownership and checkpoints data.
~azure.eventhub.aio.eventprocessor.SamplePartitionManager demonstrates the basic usage of `PartitionManager`
which stores data in memory or a file.
Users can either use the provided `PartitionManager` plug-ins or develop their own `PartitionManager`.
:type partition_manager: Subclass of ~azure.eventhub.eventprocessor.PartitionManager.
:param initial_event_position: The event position to start a partition consumer.
if the partition has no checkpoint yet. This could be replaced by "reset" checkpoint in the near future.
:type initial_event_position: EventPosition
:param polling_interval: The interval between any two pollings of balancing and claiming
:type polling_interval: float
"""
self._consumer_group_name = consumer_group_name
self._eventhub_client = eventhub_client
self._eventhub_name = eventhub_client.eh_name
self._partition_processor_factory = partition_processor_type
self._partition_manager = partition_manager
self._initial_event_position = initial_event_position # will be replaced by reset event position in preview 4
self._polling_interval = polling_interval
self._ownership_timeout = self._polling_interval * 2
self._tasks = {} # type: Dict[str, asyncio.Task]
self._id = str(uuid.uuid4())
self._running = False
def __repr__(self):
return 'EventProcessor: id {}'.format(self._id)
[docs] async def start(self):
"""Start the EventProcessor.
The EventProcessor will try to claim and balance partition ownership with other `EventProcessor`
and asynchronously start receiving EventData from EventHub and processing events.
:return: None
"""
log.info("EventProcessor %r is being started", self._id)
ownership_manager = OwnershipManager(self._eventhub_client, self._consumer_group_name, self._id,
self._partition_manager, self._ownership_timeout)
if not self._running:
self._running = True
while self._running:
try:
claimed_ownership_list = await ownership_manager.claim_ownership()
except Exception as err: # pylint:disable=broad-except
log.warning("An exception (%r) occurred during balancing and claiming ownership for eventhub %r "
"consumer group %r. Retrying after %r seconds",
err, self._eventhub_name, self._consumer_group_name, self._polling_interval)
await asyncio.sleep(self._polling_interval)
continue
if claimed_ownership_list:
claimed_partition_ids = [x["partition_id"] for x in claimed_ownership_list]
to_cancel_list = self._tasks.keys() - claimed_partition_ids
self._create_tasks_for_claimed_ownership(claimed_ownership_list)
else:
to_cancel_list = set(self._tasks.keys())
log.info("EventProcessor %r hasn't claimed an ownership. It keeps claiming.", self._id)
if to_cancel_list:
self._cancel_tasks_for_partitions(to_cancel_list)
log.info("EventProcesor %r has cancelled partitions %r", self._id, to_cancel_list)
await asyncio.sleep(self._polling_interval)
[docs] async def stop(self):
"""Stop the EventProcessor.
The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions
it is working on.
Other running EventProcessor will take over these released partitions.
A stopped EventProcessor can be restarted by calling method `start` again.
:return: None
"""
self._running = False
for _ in range(len(self._tasks)):
_, task = self._tasks.popitem()
task.cancel()
log.info("EventProcessor %r has been cancelled", self._id)
await asyncio.sleep(2) # give some time to finish after cancelled.
def _cancel_tasks_for_partitions(self, to_cancel_partitions):
for partition_id in to_cancel_partitions:
if partition_id in self._tasks:
task = self._tasks.pop(partition_id)
task.cancel()
def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list):
for ownership in to_claim_ownership_list:
partition_id = ownership["partition_id"]
if partition_id not in self._tasks or self._tasks[partition_id].done():
self._tasks[partition_id] = get_running_loop().create_task(self._receive(ownership))
@contextmanager
def _context(self, events):
# Tracing
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is None:
yield
else:
child = span_impl_type(name="Azure.EventHubs.process")
self._eventhub_client._add_span_request_attributes(child) # pylint: disable=protected-access
child.kind = SpanKind.SERVER
for event in events:
event._trace_link_message(child) # pylint: disable=protected-access
with child:
yield
async def _receive(self, ownership): # pylint: disable=too-many-statements
log.info("start ownership, %r", ownership)
partition_processor = self._partition_processor_factory()
partition_id = ownership["partition_id"]
eventhub_name = ownership["eventhub_name"]
consumer_group_name = ownership["consumer_group_name"]
owner_id = ownership["owner_id"]
partition_context = PartitionContext(
eventhub_name,
consumer_group_name,
partition_id,
owner_id,
self._partition_manager
)
partition_consumer = self._eventhub_client.create_consumer(
consumer_group_name,
partition_id,
EventPosition(ownership.get("offset", self._initial_event_position.value))
)
async def process_error(err):
log.warning(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has met an error. The exception is %r.",
owner_id, eventhub_name, partition_id, consumer_group_name, err
)
try:
await partition_processor.process_error(err, partition_context)
except Exception as err_again: # pylint:disable=broad-except
log.warning(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has another error during running process_error(). The exception is %r.",
owner_id, eventhub_name, partition_id, consumer_group_name, err_again
)
async def close(reason):
log.info(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" is being closed. Reason is: %r",
owner_id, eventhub_name, partition_id, consumer_group_name, reason
)
try:
await partition_processor.close(reason, partition_context)
except Exception as err: # pylint:disable=broad-except
log.warning(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has an error during running close(). The exception is %r.",
owner_id, eventhub_name, partition_id, consumer_group_name, err
)
try:
try:
await partition_processor.initialize(partition_context)
except Exception as err: # pylint:disable=broad-except
log.warning(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has an error during running initialize(). The exception is %r.",
owner_id, eventhub_name, partition_id, consumer_group_name, err
)
while True:
try:
events = await partition_consumer.receive()
with self._context(events):
await partition_processor.process_events(events, partition_context)
except asyncio.CancelledError:
log.info(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" is cancelled",
owner_id,
eventhub_name,
partition_id,
consumer_group_name
)
if self._running is False:
await close(CloseReason.SHUTDOWN)
else:
await close(CloseReason.OWNERSHIP_LOST)
raise
except EventHubError as eh_err:
await process_error(eh_err)
await close(CloseReason.EVENTHUB_EXCEPTION)
# An EventProcessor will pick up this partition again after the ownership is released
break
except OwnershipLostError:
await close(CloseReason.OWNERSHIP_LOST)
break
except Exception as other_error: # pylint:disable=broad-except
await process_error(other_error)
await close(CloseReason.PROCESS_EVENTS_ERROR)
break
finally:
await partition_consumer.close()