azure.eventhub.aio package

class azure.eventhub.aio.EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: TokenCredential, **kwargs)[source]

The EventHubProducerClient class defines a high level interface for receiving events from the Azure Event Hubs service.

The main goal of EventHubConsumerClient is to receive events from all partitions of an EventHub with load-balancing and checkpointing.

When multiple EventHubConsumerClient operate within one or more processes or machines targeting the same checkpointing location, they will balance automatically. To enable the load-balancing and / or checkpointing, checkpoint_store must be set when creating the EventHubConsumerClient.

An EventHubConsumerClient can also receive from a specific partition when you call its method receive() and specify the partition_id. Load-balancing won’t work in single-partition mode. But users can still save checkpoints if the checkpoint_store is set.

Parameters
  • fully_qualified_namespace (str) – The fully qualified host name for the Event Hubs namespace. This is likely to be similar to <yournamespace>.servicebus.windows.net

  • eventhub_name (str) – The path of the specific Event Hub to connect the client to.

  • consumer_group (str) – Receive events from the event hub for this consumer group.

  • credential (TokenCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts EventHubSharedKeyCredential, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.

Keyword Arguments
  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that should be appended to the built-in user agent string.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • idle_timeout (float) – Timeout in seconds after which the underlying connection will close if there is no further activity. By default the value is None, meaning that the service determines when to close an idle connection.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • checkpoint_store (CheckpointStore) – Manager for storing the partition load-balancing and checkpoint data when receiving events. If None, this EventHubConsumerClient instance will receive events without load-balancing and checkpointing. The checkpoint store will be used in both cases of receiving from all partitions or a single partition, however in the latter case load-balancing does not apply.

  • load_balancing_interval (float) – When load-balancing kicks in. This is the interval, in seconds, between two load-balancing evaluations. Default is 10 seconds.

Example:

Create a new instance of the EventHubConsumerClient.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                  consumer_group='$Default',
                                  eventhub_name=eventhub_name,
                                  credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
async close() → None[source]

Stop retrieving events from the Event Hub and close the underlying AMQP connection and links.

Return type

None

Example:

Close down the client.
import os

event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
    conn_str=event_hub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name
)

logger = logging.getLogger("azure.eventhub")

async def on_event(partition_context, event):
    logger.info("Received event from partition: {}".format(partition_context.partition_id))
    # Do asynchronous ops on the received event

# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3)  # keep receiving for 3 seconds
recv_task.cancel()  # stop receiving

# Close down the consumer handler explicitly.
await consumer.close()
classmethod from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: Optional[str] = None, logging_enable: bool = False, http_proxy: Optional[Dict[str, Union[str, int]]] = None, auth_timeout: float = 60, user_agent: Optional[str] = None, retry_total: int = 3, transport_type: Optional[TransportType] = None, checkpoint_store: Optional[CheckpointStore] = None, load_balancing_interval: float = 10, **kwargs: Any) → EventHubConsumerClient[source]

Create an EventHubConsumerClient from a connection string.

Parameters
  • conn_str (str) – The connection string of an Event Hub.

  • consumer_group (str) – Receive events from the Event Hub for this consumer group.

Keyword Arguments
  • eventhub_name (str) – The path of the specific Event Hub to connect the client to.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that should be appended to the built-in user agent string.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • idle_timeout (float) – Timeout in seconds after which the underlying connection will close if there is no further activity. By default the value is None, meaning that the service determines when to close an idle connection.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.

  • checkpoint_store (CheckpointStore) – Manager for storing the partition load-balancing and checkpoint data when receiving events. If None, this EventHubConsumerClient instance will receive events without load-balancing and checkpointing. The checkpoint store will be used in both cases of receiving from all partitions or a single partition, however in the latter case load-balancing does not apply.

  • load_balancing_interval (float) – When load-balancing kicks in. This is the interval, in seconds, between two load-balancing evaluations. Default is 10 seconds.

Return type

EventHubConsumerClient

Example:

Create a new instance of the EventHubConsumerClient from connection string.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(conn_str=event_hub_connection_str,
                                                         consumer_group='$Default',
                                                         eventhub_name=eventhub_name)
async get_eventhub_properties() → Dict[str, Any]

Get properties of the Event Hub.

Keys in the returned dictionary include:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

Return type

dict

Raises

EventHubError

async get_partition_ids() → List[str]

Get partition IDs of the Event Hub.

Return type

list[str]

Raises

EventHubError

async get_partition_properties(partition_id: str) → Dict[str, Any]

Get properties of the specified partition.

Keys in the properties dictionary include:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

Parameters

partition_id (str) – The target partition ID.

Return type

dict

Raises

EventHubError

async receive(on_event: Callable[[PartitionContext, EventData], None], *, partition_id: Optional[str] = None, owner_level: Optional[int] = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: Union[str, int, datetime.datetime, Dict[str, Any], None] = None, starting_position_inclusive: Union[bool, Dict[str, bool]] = False, on_error: Optional[Callable[[PartitionContext, Exception], None]] = None, on_partition_initialize: Optional[Callable[PartitionContext, None]] = None, on_partition_close: Optional[Callable[[PartitionContext, CloseReason], None]] = None) → None[source]

Receive events from partition(s), with optional load-balancing and checkpointing.

Parameters

on_event (Callable[PartitionContext, EventData]) – The callback function for handling a received event. The callback takes two parameters: partition_context which contains partition context and event which is the received event. The callback function should be defined like so: on_event(partition_context, event). For detailed partition context information, please refer to PartitionContext.

Keyword Arguments
  • partition_id (str) – If specified, the client will receive from this partition only. Otherwise the client will receive from all partitions.

  • owner_level (int) – The priority for an exclusive consumer. An exclusive consumer will be created if owner_level is set. A consumer with a higher owner_level has higher exclusive priority. The owner level is also know as the ‘epoch value’ of the consumer.

  • prefetch (int) – The number of events to prefetch from the service for processing. Default is 300.

  • track_last_enqueued_event_properties (bool) – Indicates whether the consumer should request information on the last-enqueued event on its associated partition, and track that information as events are received. When information about the partitions last-enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

  • starting_position (str, int, datetime.datetime or dict[str,Any]) – Start receiving from this event position if there is no checkpoint data for a partition. Checkpoint data will be used if available. This can be a a dict with partition ID as the key and position as the value for individual partitions, or a single value for all partitions. The value type can be str, int, datetime.datetime. Also supported are the values “-1” for receiving from the beginning of the stream, and “@latest” for receiving only new events.

  • starting_position_inclusive (bool or dict[str,bool]) – Determine whether the given starting_position is inclusive(>=) or not (>). True for inclusive and False for exclusive. This can be a dict with partition ID as the key and bool as the value indicating whether the starting_position for a specific partition is inclusive or not. This can also be a single bool value for all starting_position. The default value is False.

  • on_error (Callable[[PartitionContext, Exception]]) – The callback function which would be called when there an error occurs during receiving. The callback takes two parameters: partition_context which contains partition information and error being the exception. The callback should be defined like so: on_error(partition_context, error). The on_error callback will also be called if an unhandled exception is raised during the on_event callback.

  • on_partition_initialize (Callable[[PartitionContext]]) – The callback function that will be called after a consumer for a certain partition finishes initialization. The callback takes a single parameter: partition_context which contains the partition information. The callback should be defined like so: on_partition_initialize(partition_context).

  • on_partition_close (Callable[[PartitionContext, CloseReason]]) – The callback function that will be called after a consumer for a certain partition is closed. The callback takes two parameters: partition_context which contains partition information and reason for the close. The callback should be defined like so: on_partition_close(partition_context, reason). Please refer to CloseReason for the various closing reasons.

Return type

None

Example:

Receive events from the EventHub.
    logger = logging.getLogger("azure.eventhub")

    async def on_event(partition_context, event):
        logger.info("Received event from partition: {}".format(partition_context.partition_id))
        # Do asnchronous ops on received events

    async with consumer:
        await consumer.receive(on_event=on_event)
class azure.eventhub.aio.EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: TokenCredential, **kwargs)[source]

The EventHubProducerClient class defines a high level interface for sending events to the Azure Event Hubs service.

Parameters
  • fully_qualified_namespace (str) – The fully qualified host name for the Event Hubs namespace. This is likely to be similar to <yournamespace>.servicebus.windows.net

  • eventhub_name (str) – The path of the specific Event Hub to connect the client to.

  • credential (TokenCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts EventHubSharedKeyCredential, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.

Keyword Arguments
  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that should be appended to the built-in user agent string.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • idle_timeout (float) – Timeout, in seconds, after which the underlying connection will close if there is no further activity. By default the value is None, meaning that the service determines when to close an idle connection.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

Example:

Create a new instance of the EventHubProducerClient.
import os
from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                  eventhub_name=eventhub_name,
                                  credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
async close() → None[source]

Close the Producer client underlying AMQP connection and links.

Return type

None

Example:

Close down the handler.
import os
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

producer = EventHubProducerClient.from_connection_string(
    conn_str=event_hub_connection_str,
    eventhub_name=eventhub_name
)
try:
    event_data_batch = await producer.create_batch(max_size_in_bytes=10000)
    while True:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            # The EventDataBatch object reaches its max_size.
            # You can send the full EventDataBatch object and create a new one here.
            break
    await producer.send_batch(event_data_batch)
finally:
    # Close down the producer handler.
    await producer.close()
async create_batch(*, partition_id: Optional[str] = None, partition_key: Optional[str] = None, max_size_in_bytes: Optional[int] = None) → azure.eventhub._common.EventDataBatch[source]

Create an EventDataBatch object with the max size of all content being constrained by max_size_in_bytes.

The max_size should be no greater than the max allowed message size defined by the service.

Parameters
  • partition_id (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin.

  • partition_key (str) – With the given partition_key, event data will be sent to a particular partition of the Event Hub decided by the service.

  • max_size_in_bytes (int) – The maximum size of bytes data that an EventDataBatch object can hold.

Return type

EventDataBatch

Example:

Create EventDataBatch object within limited size
    from azure.eventhub import EventData
    event_data_batch = await producer.create_batch(max_size_in_bytes=10000)
    while True:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            # The EventDataBatch object reaches its max_size.
            # You can send the full EventDataBatch object and create a new one here.
            break
classmethod from_connection_string(conn_str: str, *, eventhub_name: Optional[str] = None, logging_enable: bool = False, http_proxy: Optional[Dict[str, Union[str, int]]] = None, auth_timeout: float = 60, user_agent: Optional[str] = None, retry_total: int = 3, transport_type: Optional[TransportType] = None, **kwargs: Any) → EventHubProducerClient[source]

Create an EventHubProducerClient from a connection string.

Parameters

conn_str (str) – The connection string of an Event Hub.

Keyword Arguments
  • eventhub_name (str) – The path of the specific Event Hub to connect the client to.

  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that should be appended to the built-in user agent string.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • idle_timeout (float) – Timeout, in seconds, after which the underlying connection will close if there is no further activity. By default the value is None, meaning that the service determines when to close an idle connection.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.

Return type

EventHubProducerClient

Example:

Create a new instance of the EventHubProducerClient from connection string.
import os
from azure.eventhub.aio import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_str,
                                                         eventhub_name=eventhub_name)
async get_eventhub_properties() → Dict[str, Any]

Get properties of the Event Hub.

Keys in the returned dictionary include:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

Return type

dict

Raises

EventHubError

async get_partition_ids() → List[str]

Get partition IDs of the Event Hub.

Return type

list[str]

Raises

EventHubError

async get_partition_properties(partition_id: str) → Dict[str, Any]

Get properties of the specified partition.

Keys in the properties dictionary include:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

Parameters

partition_id (str) – The target partition ID.

Return type

dict

Raises

EventHubError

async send_batch(event_data_batch: azure.eventhub._common.EventDataBatch, *, timeout: Union[int, float, None] = None) → None[source]

Sends event data and blocks until acknowledgement is received or operation times out.

Parameters
  • event_data_batch (EventDataBatch) – The EventDataBatch object to be sent.

  • timeout (float) – The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used.

Return type

None

Raises

AuthenticationError ConnectError ConnectionLostError EventDataError EventDataSendError EventHubError

Example:

Asynchronously sends event data
    async with producer:
        event_data_batch = await producer.create_batch(max_size_in_bytes=10000)
        while True:
            try:
                event_data_batch.add(EventData('Message inside EventBatchData'))
            except ValueError:
                # The EventDataBatch object reaches its max_size.
                # You can send the full EventDataBatch object and create a new one here.
                break
        await producer.send_batch(event_data_batch)
class azure.eventhub.aio.EventHubSharedKeyCredential(policy: str, key: str)[source]

The shared access key credential used for authentication.

Parameters
  • policy (str) – The name of the shared access policy.

  • key (str) – The shared access key.

async get_token(*scopes, **kwargs)[source]
class azure.eventhub.aio.CheckpointStore[source]

CheckpointStore deals with the interaction with the chosen storage service.

It can list and claim partition ownerships; and list and save checkpoints.

abstract async claim_ownership(ownership_list: Iterable[Dict[str, Any]]) → Iterable[Dict[str, Any]][source]

Tries to claim ownership for a list of specified partitions.

Parameters

ownership_list (Iterable[Dict[str,Any]]) – Iterable of dictionaries containing all the ownerships to claim.

Return type

Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information:

  • fully_qualified_namespace (str): The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”

  • eventhub_name (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group (str): The name of the consumer group the ownership are associated with.

  • partition_id (str): The partition ID which the checkpoint is created for.

  • owner_id (str): A UUID representing the owner attempting to claim this partition.

  • last_modified_time (UTC datetime.datetime): The last time this ownership was claimed.

  • etag (str): The Etag value for the last time this ownership was modified. Optional depending on storage implementation.

abstract async list_checkpoints(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str)[source]

List the updated checkpoints from the store.

Parameters
  • fully_qualified_namespace (str) – The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”

  • eventhub_name (str) – The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group (str) – The name of the consumer group the checkpoints are associated with.

Return type

Iterable[Dict[str,Any]], Iterable of dictionaries containing partition checkpoint information:

  • fully_qualified_namespace (str): The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”

  • eventhub_name (str): The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group (str): The name of the consumer group the checkpoints are associated with.

  • partition_id (str): The partition ID which the checkpoint is created for.

  • sequence_number (int): The sequence number of the EventData.

  • offset (str): The offset of the EventData.

abstract async list_ownership(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str) → Iterable[Dict[str, Any]][source]

Retrieves a complete ownership list from the chosen storage service.

Parameters
  • fully_qualified_namespace (str) – The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”

  • eventhub_name (str) – The name of the specific Event Hub the partition ownerships are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group (str) – The name of the consumer group the ownerships are associated with.

Return type

Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information:

  • fully_qualified_namespace (str): The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”

  • eventhub_name (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group (str): The name of the consumer group the ownership are associated with.

  • partition_id (str): The partition ID which the checkpoint is created for.

  • owner_id (str): A UUID representing the current owner of this partition.

  • last_modified_time (UTC datetime.datetime): The last time this ownership was claimed.

  • etag (str): The Etag value for the last time this ownership was modified. Optional depending on storage implementation.

abstract async update_checkpoint(checkpoint: Dict[str, Union[str, int]]) → None[source]

Updates the checkpoint using the given information for the offset, associated partition and consumer group in the chosen storage service.

Note: If you plan to implement a custom checkpoint store with the intention of running between cross-language EventHubs SDKs, it is recommended to persist the offset value as an integer.

Parameters

checkpoint (Dict[str,Any]) –

A dict containing checkpoint information:

  • fully_qualified_namespace (str): The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”

  • eventhub_name (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group (str): The name of the consumer group the checkpoint is associated with.

  • partition_id (str): The partition ID which the checkpoint is created for.

  • sequence_number (int): The sequence number of the EventData the new checkpoint will be associated with.

  • offset (str): The offset of the EventData the new checkpoint will be associated with.

Return type

None

class azure.eventhub.aio.PartitionContext(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, partition_id: str, checkpoint_store: azure.eventhub.aio._eventprocessor.checkpoint_store.CheckpointStore = None)[source]

Contains partition related context information.

A PartitionContext instance will be passed to the event, error and initialization callbacks defined when calling EventHubConsumerClient.receive(). Users can call update_checkpoint() of this class to persist checkpoint data.

async update_checkpoint(event)[source]

Updates the receive checkpoint to the given events offset.

This operation will only update a checkpoint if a checkpoint_store was provided during creation of the EventHubConsumerClient. Otherwise a warning will be logged.

Parameters

event (EventData) – The EventData instance which contains the offset and sequence number information used for checkpoint.

Return type

None

property last_enqueued_event_properties

The latest enqueued event information.

This property will be updated each time an event is received if the receiver is created with track_last_enqueued_event_properties set to True. The properties dict includes following information of the last enqueued event:

  • sequence_number (int)

  • offset (str)

  • enqueued_time (UTC datetime.datetime)

  • retrieval_time (UTC datetime.datetime)

Return type

dict or None