Source code for azure.eventhub.aio._eventprocessor.partition_context

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

from typing import Dict, Optional, Any, TYPE_CHECKING
import logging
from .checkpoint_store import CheckpointStore
from ..._utils import get_last_enqueued_event_properties

if TYPE_CHECKING:
    from ..._common import EventData

_LOGGER = logging.getLogger(__name__)


[docs]class PartitionContext(object): """Contains partition related context information. A `PartitionContext` instance will be passed to the event, error and initialization callbacks defined when calling `EventHubConsumerClient.receive()`. Users can call `update_checkpoint()` of this class to persist checkpoint data. """ def __init__( self, fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, partition_id: str, checkpoint_store: CheckpointStore = None, ) -> None: self.fully_qualified_namespace = fully_qualified_namespace self.partition_id = partition_id self.eventhub_name = eventhub_name self.consumer_group = consumer_group self._last_received_event = None # type: Optional[EventData] self._checkpoint_store = checkpoint_store @property def last_enqueued_event_properties(self) -> Optional[Dict[str, Any]]: """The latest enqueued event information. This property will be updated each time an event is received if the receiver is created with `track_last_enqueued_event_properties` set to `True`. The properties dict includes following information of the last enqueued event: - `sequence_number` (int) - `offset` (str) - `enqueued_time` (UTC datetime.datetime) - `retrieval_time` (UTC datetime.datetime) :rtype: dict or None """ if self._last_received_event: return get_last_enqueued_event_properties(self._last_received_event) return None
[docs] async def update_checkpoint(self, event: Optional["EventData"] = None) -> None: """Updates the receive checkpoint to the given events offset. :param ~azure.eventhub.EventData event: The EventData instance which contains the offset and sequence number information used for checkpoint. :rtype: None """ if self._checkpoint_store: checkpoint_event = event or self._last_received_event if checkpoint_event: checkpoint = { "fully_qualified_namespace": self.fully_qualified_namespace, "eventhub_name": self.eventhub_name, "consumer_group": self.consumer_group, "partition_id": self.partition_id, "offset": checkpoint_event.offset, "sequence_number": checkpoint_event.sequence_number, } await self._checkpoint_store.update_checkpoint(checkpoint) else: _LOGGER.warning( "namespace %r, eventhub %r, consumer_group %r, partition_id %r " "update_checkpoint is called without checkpoint store. No checkpoint is updated.", self.fully_qualified_namespace, self.eventhub_name, self.consumer_group, self.partition_id, )