azure.eventhub package

class azure.eventhub.EventHubConsumerClient(host, event_hub_path, credential, **kwargs)[source]

The EventHubProducerClient class defines a high level interface for receiving events from the Azure Event Hubs service.

The main goal of EventHubConsumerClient is to receive events from all partitions of an EventHub with load balancing and checkpointing.

When multiple EventHubConsumerClient works with one process, multiple processes, or multiple computer machines and if they use the same repository as the load balancing and checkpointing store, they will balance automatically. To enable the load balancing and / or checkpointing, partition_manager must be set when creating the EventHubConsumerClient.

An EventHubConsumerClient can also receive from a specific partition when you call its method receive() and specify the partition_id. Load balancing won’t work in single-partition mode. But users can still save checkpoint if the partition_manager is set.

Parameters
  • host (str) – The hostname of the Event Hub.

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to.

  • credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts EventHubSharedKeyCredential, EventHubSASTokenCredential, or credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.

Keyword Arguments
  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • partition_manager (PartitionManager) – stores the load balancing data and checkpoint data when receiving events if partition_manager is specified. If it’s None, this EventHubConsumerClient instance will receive events without load balancing and checkpoint.

  • load_balancing_interval (float) – When load balancing kicks in, this is the interval in seconds between two load balancing. Default is 10.

Example:

Create a new instance of the EventHubConsumerClient.
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

hostname = os.environ['EVENT_HUB_HOSTNAME']
event_hub = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

consumer = EventHubConsumerClient(host=hostname,
                                  event_hub_path=event_hub,
                                  credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
close()[source]

Stop retrieving events from event hubs and close the underlying AMQP connection and links.

Return type

None

Example:

Close down the client.
import os
import threading

event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
event_hub = os.environ['EVENT_HUB_NAME']

from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
    conn_str=event_hub_connection_str,
    event_hub_path=event_hub
)

logger = logging.getLogger("azure.eventhub")

def on_events(partition_context, events):
    logger.info("Received {} messages from partition: {}".format(
        len(events), partition_context.partition_id))
    # Do ops on received events

# The receive method is blocking call, so execute it in a thread to
# better demonstrate how to stop the receiving by calling he close method.

worker = threading.Thread(target=consumer.receive,
                          kwargs={"on_events": on_events,
                                  "consumer_group": "$Default"})
worker.start()
time.sleep(10)  # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
classmethod from_connection_string(conn_str, **kwargs)[source]

Create an EventHubConsumerClient from a connection string.

Parameters

conn_str (str) – The connection string of an eventhub.

Keyword Arguments
  • event_hub_path (str) – The path of the specific Event Hub to connect the client to.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • http_proxy (dict[str,Any]) – HTTP proxy settings. This must be a dictionary with the following keys - ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present - ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.

  • partition_manager (PartitionManager) – stores the load balancing data and checkpoint data when receiving events if partition_manager is specified. If it’s None, this EventHubConsumerClient instance will receive events without load balancing and checkpoint.

  • load_balancing_interval (float) – When load balancing kicks in, this is the interval in seconds between two load balancing. Default is 10.

Example:

Create a new instance of the EventHubConsumerClient from connection string.
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
event_hub = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(conn_str=event_hub_connection_str,
                                                         event_hub_path=event_hub)
get_partition_ids()

Get partition ids of the specified EventHub.

Return type

list[str]

Raises

EventHubError

get_partition_properties(partition)

Get properties of the specified partition async. Keys in the details dictionary include:

  • event_hub_path

  • id

  • beginning_sequence_number

  • last_enqueued_sequence_number

  • last_enqueued_offset

  • last_enqueued_time_utc

  • is_empty

Parameters

partition (str) – The target partition id.

Return type

dict

Raises

EventHubError

get_properties()

Get properties of the specified EventHub async. Keys in the details dictionary include:

  • path

  • created_at

  • partition_ids

Return type

dict

Raises

EventHubError

receive(on_events, consumer_group, **kwargs)[source]

Receive events from partition(s) optionally with load balancing and checkpointing.

Parameters
  • on_events (Callable[PartitionContext, List[EventData]]) – The callback function for handling received events. The callback takes two parameters: partition_context which contains partition context and events which are the received events. Please define the callback like on_event(partition_context, events). For detailed partition context information, please refer to PartitionContext.

  • consumer_group (str) – Receive events from the event hub for this consumer group

Keyword Arguments
  • partition_id (str) – Receive from this partition only if it’s not None. Receive from all partition otherwise.

  • owner_level (int) – The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set. Higher owner_level has higher exclusive priority.

  • prefetch (int) – The number of events to prefetch from the service for processing. Default is 300.

  • track_last_enqueued_event_properties (bool) – Indicates whether the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

  • initial_event_position (EventPosition or dict[str,EventPosition]) – Start receiving from this initial_event_position if there isn’t checkpoint data for a partition. Use the checkpoint data if there it’s available. This can be a a dict with partition id as the key and position as the value for individual partitions, or a single EventPosition instance for all partitions.

  • on_error (Callable[[PartitionContext, Exception]]) – The callback function which would be called when there is an error met during the receiving time. The callback takes two parameters: partition_context which contains partition information and error being the exception. Please define the callback like on_error(partition_context, error).

  • on_partition_initialize (Callable[[PartitionContext]]) – The callback function which will be called after a consumer for certain partition finishes initialization. The callback takes two parameter: partition_context which contains the partition information. Please define the callback like on_partition_initialize(partition_context).

  • on_partition_close (Callable[[PartitionContext, CloseReason]]) – The callback function which will be called after a consumer for certain partition is closed. The callback takes two parameters: partition_context which contains partition information and reason for the close. Please define the callback like on_partition_close(partition_context, reason). Please refer to CloseReason for different closing reason.

Return type

None

Example:

Receive events from the EventHub.
    logger = logging.getLogger("azure.eventhub")

    def on_events(partition_context, events):
        logger.info("Received {} messages from partition: {}".format(
            len(events), partition_context.partition_id))
        # Do ops on received events

    with consumer:
        consumer.receive(on_events=on_events, consumer_group='$Default')
class azure.eventhub.EventHubProducerClient(host, event_hub_path, credential, **kwargs)[source]

The EventHubProducerClient class defines a high level interface for sending events to the Azure Event Hubs service.

Parameters
  • host (str) – The hostname of the Event Hub.

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to.

  • credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts EventHubSharedKeyCredential, EventHubSASTokenCredential, or credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.

Keyword Arguments
  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

Example:

Create a new instance of the EventHubProducerClient.
import os
from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential

hostname = os.environ['EVENT_HUB_HOSTNAME']
event_hub = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

producer = EventHubProducerClient(host=hostname,
                                  event_hub_path=event_hub,
                                  credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
close()[source]

Close down the client. If the client has already closed, this will be a no op.

Return type

None

Example:

Close down the client.
import os
from azure.eventhub import EventHubProducerClient, EventData

event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
event_hub = os.environ['EVENT_HUB_NAME']

producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_str,
                                                         event_hub_path=event_hub)
try:
    producer.send(EventData(b"A single event"))
finally:
    # Close down the producer handler.
    producer.close()
create_batch(max_size=None)[source]

Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side.

Parameters

max_size (int) – The maximum size of bytes data that an EventDataBatch object can hold.

Return type

EventDataBatch

Example:

Create EventDataBatch object within limited size
    event_data_batch = producer.create_batch(max_size=10000)
    while True:
        try:
            event_data_batch.try_add(EventData('Message inside EventBatchData'))
        except ValueError:
            # The EventDataBatch object reaches its max_size.
            # You can send the full EventDataBatch object and create a new one here.
            break
classmethod from_connection_string(conn_str, **kwargs)[source]

Create an EventHubProducerClient from a connection string.

Parameters

conn_str (str) – The connection string of an eventhub.

Keyword Arguments
  • event_hub_path (str) – The path of the specific Event Hub to connect the client to.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • http_proxy (dict[str,Any]) – HTTP proxy settings. This must be a dictionary with the following keys - ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present - ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp.

Example:

Create a new instance of the EventHubProducerClient from connection string.
import os
from azure.eventhub import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
event_hub = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_str,
                                                         event_hub_path=event_hub)
get_partition_ids()

Get partition ids of the specified EventHub.

Return type

list[str]

Raises

EventHubError

get_partition_properties(partition)

Get properties of the specified partition async. Keys in the details dictionary include:

  • event_hub_path

  • id

  • beginning_sequence_number

  • last_enqueued_sequence_number

  • last_enqueued_offset

  • last_enqueued_time_utc

  • is_empty

Parameters

partition (str) – The target partition id.

Return type

dict

Raises

EventHubError

get_properties()

Get properties of the specified EventHub async. Keys in the details dictionary include:

  • path

  • created_at

  • partition_ids

Return type

dict

Raises

EventHubError

send(event_data, **kwargs)[source]

Sends event data and blocks until acknowledgement is received or operation times out.

Parameters

event_data (EventData, EventDataBatch, EventData Iterator/Generator/list) – The event to be sent. It can be an EventData object, or iterable of EventData objects.

Keyword Arguments
  • partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service.

  • partition_id (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin.

  • timeout (float) – The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used.

Return type

None

Raises

AuthenticationError ConnectError ConnectionLostError EventDataError EventDataSendError EventHubError

Example:

Sends event data
    with producer:
        event_data = EventData(b"A single event")
        producer.send(event_data)
class azure.eventhub.EventData(body=None)[source]

The EventData class is a holder of event content.

Parameters

body (str or bytes) – The data to send in a single message. body can be type of str or bytes.

Example:

Create instances of EventData
    from azure.eventhub import EventData

    event_data = EventData("String data")
    event_data = EventData(b"Bytes data")

body_as_json(encoding='UTF-8')[source]

The body of the event loaded as a JSON object is the data is compatible.

Parameters

encoding – The encoding to use for decoding message data. Default is ‘UTF-8’

Return type

dict

body_as_str(encoding='UTF-8')[source]

The body of the event data as a string if the data is of a compatible type.

Parameters

encoding – The encoding to use for decoding message data. Default is ‘UTF-8’

Return type

str

encode_message()[source]
property application_properties

Application defined properties on the message.

Return type

dict

property body

The body of the event data object.

Return type

bytes or Generator[bytes]

property enqueued_time

The enqueued timestamp of the event data object.

Return type

datetime.datetime

property last_enqueued_event_properties

The latest enqueued event information. This property will be updated each time an event is received when the receiver is created with track_last_enqueued_event_properties being True. The dict includes following information of the partition:

  • sequence_number

  • offset

  • enqueued_time

  • retrieval_time

Return type

dict or None

property offset

The offset of the event data object.

Return type

str

property partition_key

The partition key of the event data object.

Return type

bytes

property sequence_number

The sequence number of the event data object.

Return type

int or long

property system_properties

Metadata set by the Event Hubs Service associated with the EventData

Return type

dict

class azure.eventhub.EventDataBatch(max_size=None, partition_key=None)[source]

Sending events in batch get better performance than sending individual events. EventDataBatch helps you create the maximum allowed size batch of EventData to improve sending performance.

Use try_add method to add events until the maximum batch size limit in bytes has been reached - a ValueError will be raised. Use send method of EventHubProducerClient or the async EventHubProducerClient for sending. The send method accepts partition_key as a parameter for sending a particular partition.

Please use the create_batch method of EventHubProducerClient to create an EventDataBatch object instead of instantiating an EventDataBatch object directly.

Parameters
  • max_size (int) – The maximum size of bytes data that an EventDataBatch object can hold.

  • partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service.

try_add(event_data)[source]

Try to add an EventData object, the size of EventData is a sum up of body, application_properties, etc.

Parameters

event_data (EventData) – The EventData object which is attempted to be added.

Return type

None

Raise

ValueError, when exceeding the size limit.

property size

The size of EventDataBatch object in bytes

Return type

int

class azure.eventhub.EventPosition(value, inclusive=False)[source]

The position(offset, sequence or timestamp) where a consumer starts.

Parameters
  • value (int, str or datetime.datetime) – The event position value. The value can be type of datetime.datetime or int or str.

  • inclusive (bool) – Whether to include the supplied value as the start point.

Examples:

Beginning of the event stream:
>>> event_pos = EventPosition("-1")
End of the event stream:
>>> event_pos = EventPosition("@latest")
Events after the specified offset:
>>> event_pos = EventPosition("12345")
Events from the specified offset:
>>> event_pos = EventPosition("12345", True)
Events after a datetime:
>>> event_pos = EventPosition(datetime.datetime.utcnow())
Events after a specific sequence number:
>>> event_pos = EventPosition(1506968696002)
class azure.eventhub.EventHubSharedKeyCredential(policy, key)[source]

The shared access key credential used for authentication.

Parameters
  • policy (str) – The name of the shared access policy.

  • key (str) – The shared access key.

class azure.eventhub.EventHubSASTokenCredential(token)[source]

SAS token used for authentication.

Parameters

token – A SAS token or function that returns a SAS token. If a function is supplied, it will be used to retrieve subsequent tokens in the case of token expiry. The function should take no arguments. The token can be type of str or Callable object.

get_sas_token()[source]
class azure.eventhub.PartitionManager[source]

PartitionManager deals with the interaction with the chosen storage service. It’s able to list/claim ownership and save checkpoint.

abstract claim_ownership(ownership_list)[source]

Tries to claim a list of specified ownership.

Parameters

ownership_list (Iterable[Dict[str,Any]]) – Iterable of dictionaries containing all the ownership to claim.

Return type

Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information:

  • fully_qualified_namespace

  • eventhub_name

  • consumer_group_name

  • owner_id

  • partition_id

  • last_modified_time

  • etag

abstract list_checkpoints(fully_qualified_namespace, eventhub_name, consumer_group_name)[source]

List the updated checkpoints from the store

Parameters
  • fully_qualified_namespace (str) – The fully qualified namespace that the event hub belongs to. The format is like “<namespace>.servicebus.windows.net”

  • eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group_name (str) – The name of the consumer group the ownership are associated with.

Return type

Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information:

  • fully_qualified_namespace

  • eventhub_name

  • consumer_group_name

  • partition_id

  • sequence_number

  • offset

abstract list_ownership(fully_qualified_namespace, eventhub_name, consumer_group_name)[source]

Retrieves a complete ownership list from the chosen storage service.

Parameters
  • fully_qualified_namespace (str) – The fully qualified namespace that the event hub belongs to. The format is like “<namespace>.servicebus.windows.net”

  • eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group_name (str) – The name of the consumer group the ownership are associated with.

Return type

Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information:

  • fully_qualified_namespace

  • eventhub_name

  • consumer_group_name

  • owner_id

  • partition_id

  • last_modified_time

  • etag

abstract update_checkpoint(fully_qualified_namespace, eventhub_name, consumer_group_name, partition_id, offset, sequence_number)[source]

Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.

Parameters
  • fully_qualified_namespace (str) – The fully qualified namespace that the event hub belongs to. The format is like “<namespace>.servicebus.windows.net”

  • eventhub_name (str) – The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.

  • consumer_group_name (str) – The name of the consumer group the ownership are associated with.

  • partition_id (str) – The partition id which the checkpoint is created for.

  • offset (str) – The offset of the EventData the new checkpoint will be associated with.

  • sequence_number (int) – The sequence_number of the EventData the new checkpoint will be associated with.

Return type

None

class azure.eventhub.PartitionContext(fully_qualified_namespace, eventhub_name, consumer_group_name, partition_id, owner_id, partition_manager=None)[source]

Contains partition related context information for a PartitionProcessor instance to use.

Users can use update_checkpoint() of this class to save checkpoint data.

update_checkpoint(event)[source]

Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.

Parameters

event (EventData) – The EventData instance which contains the offset and sequence number information used for checkpoint.

Return type

None

class azure.eventhub.CloseReason[source]

A partition consumer is closed due to two reasons: SHUTDOWN: It is explicitly required to stop, this would happen when the EventHubConsumerClient is closed. OWNERSHIP_LOST: It loses the ownership of a partition, this would happend when other EventHubConsumerClient instance claims ownership of the partition.

OWNERSHIP_LOST = 1
SHUTDOWN = 0
class azure.eventhub.TransportType[source]

Transport type The underlying transport protocol type:

Amqp: AMQP over the default TCP transport protocol, it uses port 5671. AmqpOverWebsocket: Amqp over the Web Sockets transport protocol, it uses port 443.

Amqp = 1
AmqpOverWebsocket = 2
class azure.eventhub.EventHubError(message, details=None)[source]

Represents an error happened in the client.

Variables
  • message (str) – The error message.

  • error (str) – The error condition, if available.

  • details (dict[str, str]) – The error details, if included in the service response.

class azure.eventhub.ConnectError(message, details=None)[source]

Fail to connect to event hubs

class azure.eventhub.ConnectionLostError(message, details=None)[source]

Connection to event hub is lost. SDK will retry. So this shouldn’t happen.

class azure.eventhub.EventDataError(message, details=None)[source]

Problematic event data so the send will fail at client side

class azure.eventhub.EventDataSendError(message, details=None)[source]

Service returns error while an event data is being sent

class azure.eventhub.AuthenticationError(message, details=None)[source]

Fail to connect to event hubs because of authentication problem

class azure.eventhub.OwnershipLostError[source]

Raises when update_checkpoint detects the ownership to a partition has been lost.