Source code for azure.eventhub.aio.eventprocessor.partition_processor

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

from typing import List
from abc import ABC, abstractmethod
from enum import Enum
from azure.eventhub import EventData
from .partition_context import PartitionContext


[docs]class CloseReason(Enum): SHUTDOWN = 0 # user call EventProcessor.stop() OWNERSHIP_LOST = 1 # lose the ownership of a partition. EVENTHUB_EXCEPTION = 2 # Exception happens during receiving events PROCESS_EVENTS_ERROR = 3 # Exception happens during process_events
[docs]class PartitionProcessor(ABC): """ PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class implementing this abstract class will be created for every partition the associated ~azure.eventhub.aio.eventprocessor.EventProcessor owns. """
[docs] async def initialize(self, partition_context: PartitionContext): """This method will be called when `EventProcessor` creates a `PartitionProcessor`. :param partition_context: The context information of this partition. :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext """
# Please put the code for initialization of PartitionProcessor here.
[docs] async def close(self, reason, partition_context: PartitionContext): """Called when EventProcessor stops processing this PartitionProcessor. There are different reasons to trigger the PartitionProcessor to close. Refer to enum class ~azure.eventhub.eventprocessor.CloseReason :param reason: Reason for closing the PartitionProcessor. :type reason: ~azure.eventhub.eventprocessor.CloseReason :param partition_context: The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store. :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext """
# Please put the code for closing PartitionProcessor here.
[docs] @abstractmethod async def process_events(self, events: List[EventData], partition_context: PartitionContext): """Called when a batch of events have been received. :param events: Received events. :type events: list[~azure.eventhub.common.EventData] :param partition_context: The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store. :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext """
# Please put the code for processing events here.
[docs] async def process_error(self, error, partition_context: PartitionContext): """Called when an error happens when receiving or processing events :param error: The error that happens. :type error: Exception :param partition_context: The context information of this partition. Use its method update_checkpoint to save checkpoint to the data store. :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext """
# Please put the code for processing error here.