Source code for azure.eventhub._eventprocessor.partition_context

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import logging
from typing import Optional
from .._utils import get_last_enqueued_event_properties
from .checkpoint_store import CheckpointStore

_LOGGER = logging.getLogger(__name__)


[docs]class PartitionContext(object): """Contains partition related context information. A `PartitionContext` instance will be passed to the event, error and initialization callbacks defined when calling `EventHubConsumerClient.receive()`. Users can call `update_checkpoint()` of this class to persist checkpoint data. """ def __init__(self, fully_qualified_namespace, eventhub_name, consumer_group, partition_id, checkpoint_store=None): # type: (str, str, str, str, Optional[CheckpointStore]) -> None self.fully_qualified_namespace = fully_qualified_namespace self.partition_id = partition_id self.eventhub_name = eventhub_name self.consumer_group = consumer_group self._checkpoint_store = checkpoint_store self._last_received_event = None @property def last_enqueued_event_properties(self): """The latest enqueued event information. This property will be updated each time an event is received if the receiver is created with `track_last_enqueued_event_properties` set to `True`. The properties dict includes following information of the last enqueued event: - `sequence_number` (int) - `offset` (str) - `enqueued_time` (UTC datetime.datetime) - `retrieval_time` (UTC datetime.datetime) :rtype: dict or None """ if self._last_received_event: return get_last_enqueued_event_properties(self._last_received_event) return None
[docs] def update_checkpoint(self, event): """Updates the receive checkpoint to the given events offset. This operation will only update a checkpoint if a `checkpoint_store` was provided during creation of the `EventHubConsumerClient`. Otherwise a warning will be logged. :param ~azure.eventhub.EventData event: The EventData instance which contains the offset and sequence number information used for checkpoint. :rtype: None """ if self._checkpoint_store: checkpoint = { 'fully_qualified_namespace': self.fully_qualified_namespace, 'eventhub_name': self.eventhub_name, 'consumer_group': self.consumer_group, 'partition_id': self.partition_id, 'offset': event.offset, 'sequence_number': event.sequence_number } self._checkpoint_store.update_checkpoint(checkpoint) else: _LOGGER.warning( "namespace %r, eventhub %r, consumer_group %r, partition_id %r " "update_checkpoint is called without checkpoint store. No checkpoint is updated.", self.fully_qualified_namespace, self.eventhub_name, self.consumer_group, self.partition_id)