azure.eventhub package

Submodules

azure.eventhub.client module

class azure.eventhub.client.EventHubClient(host, event_hub_path, credential, **kwargs)[source]

The EventHubClient class defines a high level interface for sending events to and receiving events from the Azure Event Hubs service.

Example

Create a new instance of the Event Hub client
import os
from azure.eventhub import EventHubClient, EventHubSharedKeyCredential

host = os.environ['EVENT_HUB_HOSTNAME']
event_hub_path = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

client = EventHubClient(
    host=host,
    event_hub_path=event_hub_path,
    credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
)

Constructs a new EventHubClient.

Parameters
  • host (str) – The hostname of the Event Hub.

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.

  • http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.

  • receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

close()[source]
create_consumer(consumer_group, partition_id, event_position, **kwargs)[source]

Create a consumer to the client for a particular consumer group and partition.

Parameters
  • consumer_group (str) – The name of the consumer group this consumer is associated with. Events are read in the context of this group. The default consumer_group for an event hub is “$Default”.

  • partition_id (str) – The identifier of the Event Hub partition from which events will be received.

  • event_position (EventPosition) – The position within the partition where the consumer should begin reading events.

  • owner_level (int) – The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

Return type

EventHubConsumer

Example

Add a consumer to the client for a particular consumer group and partition.
client = EventHubClient.from_connection_string(connection_str)
# Create a consumer.
consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'))
# Create an exclusive consumer object.
exclusive_consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=1)
create_producer(partition_id=None, send_timeout=None)[source]

Create an producer to send EventData object to an EventHub.

Parameters
  • partition_id (str) – Optionally specify a particular partition to send to. If omitted, the events will be distributed to available partitions via round-robin.

  • operation (str) – An optional operation to be appended to the hostname in the target URL. The value must start with / character.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

Return type

EventHubProducer

Example

Add a producer to the client to send EventData.
client = EventHubClient.from_connection_string(connection_str)
# Create a producer.
producer = client.create_producer(partition_id="0")
classmethod from_connection_string(conn_str, **kwargs)

Create an EventHubClient from an EventHub connection string.

Parameters
  • conn_str (str) – The connection string of an eventhub

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.

  • receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds, meaning there is no timeout.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

Example

Create an EventHubClient from a connection string.
import os
from azure.eventhub import EventHubClient

connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
    os.environ['EVENT_HUB_HOSTNAME'],
    os.environ['EVENT_HUB_SAS_POLICY'],
    os.environ['EVENT_HUB_SAS_KEY'],
    os.environ['EVENT_HUB_NAME'])
client = EventHubClient.from_connection_string(connection_str)
get_partition_ids()[source]

Get partition ids of the specified EventHub.

Return type

list[str]

Raises

~azure.eventhub.EventHubError

get_partition_properties(partition)[source]

Get properties of the specified partition. Keys in the details dictionary include:

-‘event_hub_path’ -‘id’ -‘beginning_sequence_number’ -‘last_enqueued_sequence_number’ -‘last_enqueued_offset’ -‘last_enqueued_time_utc’ -‘is_empty’

Parameters

partition (str) – The target partition id.

Return type

dict

Raises

~azure.eventhub.ConnectError

get_properties()[source]

Get properties of the specified EventHub. Keys in the details dictionary include:

-‘path’ -‘created_at’ -‘partition_ids’

Return type

dict

Raises

~azure.eventhub.EventHubError

azure.eventhub.client_abstract module

class azure.eventhub.client_abstract.EventHubClientAbstract(host, event_hub_path, credential, **kwargs)[source]

The EventHubClientAbstract class defines a high level interface for sending events to and receiving events from the Azure Event Hubs service.

Constructs a new EventHubClient.

Parameters
  • host (str) – The hostname of the Event Hub.

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.

  • http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.

  • receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

classmethod from_connection_string(conn_str, **kwargs)[source]

Create an EventHubClient from an EventHub connection string.

Parameters
  • conn_str (str) – The connection string of an eventhub

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.

  • receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds, meaning there is no timeout.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

Example

Create an EventHubClient from a connection string.
import os
from azure.eventhub import EventHubClient

connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
    os.environ['EVENT_HUB_HOSTNAME'],
    os.environ['EVENT_HUB_SAS_POLICY'],
    os.environ['EVENT_HUB_SAS_KEY'],
    os.environ['EVENT_HUB_NAME'])
client = EventHubClient.from_connection_string(connection_str)

azure.eventhub.common module

class azure.eventhub.common.EventData(body=None)[source]

The EventData class is a holder of event content.

Example

Create instances of EventData
    event_data = EventData("String data")
    event_data = EventData(b"Bytes data")
    event_data = EventData([b"A", b"B", b"C"])

    list_data = ['Message {}'.format(i) for i in range(10)]
    event_data = EventData(body=list_data)

Initialize EventData.

Parameters

body (str, bytes or list) – The data to send in a single message.

body_as_json(encoding='UTF-8')[source]

The body of the event loaded as a JSON object is the data is compatible.

Parameters

encoding – The encoding to use for decoding message data. Default is ‘UTF-8’

Return type

dict

body_as_str(encoding='UTF-8')[source]

The body of the event data as a string if the data is of a compatible type.

Parameters

encoding – The encoding to use for decoding message data. Default is ‘UTF-8’

Return type

str or unicode

encode_message()[source]
PROP_LAST_ENQUEUED_OFFSET = b'last_enqueued_offset'
PROP_LAST_ENQUEUED_SEQUENCE_NUMBER = b'last_enqueued_sequence_number'
PROP_LAST_ENQUEUED_TIME_UTC = b'last_enqueued_time_utc'
PROP_OFFSET = b'x-opt-offset'
PROP_PARTITION_KEY = b'x-opt-partition-key'
PROP_PARTITION_KEY_AMQP_SYMBOL = <uamqp.types.AMQPSymbol object>
PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC = b'runtime_info_retrieval_time_utc'
PROP_SEQ_NUMBER = b'x-opt-sequence-number'
PROP_TIMESTAMP = b'x-opt-enqueued-time'
property application_properties

Application defined properties on the message.

Return type

dict

property body

The body of the event data object.

Return type

bytes or Generator[bytes]

property enqueued_time

The enqueued timestamp of the event data object.

Return type

datetime.datetime

property offset

The offset of the event data object.

Return type

str

property partition_key

The partition key of the event data object.

Return type

bytes

property sequence_number

The sequence number of the event data object.

Return type

int or long

property system_properties

Metadata set by the Event Hubs Service associated with the EventData

Return type

dict

class azure.eventhub.common.EventDataBatch(max_size=None, partition_key=None)[source]

Sending events in batch get better performance than sending individual events. EventDataBatch helps you create the maximum allowed size batch of EventData to improve sending performance.

Use try_add method to add events until the maximum batch size limit in bytes has been reached - a ValueError will be raised. Use send method of ~azure.eventhub.EventHubProducer or ~azure.eventhub.aio.EventHubProducer for sending.

Please use the create_batch method of EventHubProducer to create an EventDataBatch object instead of instantiating an EventDataBatch object directly.

try_add(event_data)[source]

The message size is a sum up of body, properties, header, etc. :param event_data: ~azure.eventhub.EventData :return: None :raise: ValueError, when exceeding the size limit.

property size

The size in bytes

Returns

int

class azure.eventhub.common.EventHubSASTokenCredential(token)[source]

SAS token used for authentication.

Parameters

token (str or callable) – A SAS token or function that returns a SAS token. If a function is supplied, it will be used to retrieve subsequent tokens in the case of token expiry. The function should take no arguments.

get_sas_token()[source]
class azure.eventhub.common.EventHubSharedKeyCredential(policy, key)[source]

The shared access key credential used for authentication.

Parameters
  • policy (str) – The name of the shared access policy.

  • key (str) – The shared access key.

class azure.eventhub.common.EventPosition(value, inclusive=False)[source]

The position(offset, sequence or timestamp) where a consumer starts. Examples:

Beginning of the event stream:
>>> event_pos = EventPosition("-1")
End of the event stream:
>>> event_pos = EventPosition("@latest")
Events after the specified offset:
>>> event_pos = EventPosition("12345")
Events from the specified offset:
>>> event_pos = EventPosition("12345", True)
Events after a datetime:
>>> event_pos = EventPosition(datetime.datetime.utcnow())
Events after a specific sequence number:
>>> event_pos = EventPosition(1506968696002)

Initialize EventPosition.

Parameters
  • value (datetime or int or str) – The event position value.

  • inclusive (bool) – Whether to include the supplied value as the start point.

azure.eventhub.common.parse_sas_token(sas_token)[source]

Parse a SAS token into its components.

Parameters

sas_token (str) – The SAS token.

Return type

dict[str, str]

azure.eventhub.configuration module

azure.eventhub.consumer module

class azure.eventhub.consumer.EventHubConsumer(client, source, **kwargs)[source]

A consumer responsible for reading EventData from a specific Event Hub partition and as a member of a specific consumer group.

A consumer may be exclusive, which asserts ownership over the partition for the consumer group to ensure that only one consumer from that group is reading the from the partition. These exclusive consumers are sometimes referred to as “Epoch Consumers.”

A consumer may also be non-exclusive, allowing multiple consumers from the same consumer group to be actively reading events from the partition. These non-exclusive consumers are sometimes referred to as “Non-Epoch Consumers.”

Please use the method create_consumer on EventHubClient for creating EventHubConsumer.

Instantiate a consumer. EventHubConsumer should be instantiated by calling the create_consumer method in EventHubClient.

Parameters
  • client (EventHubClient) – The parent EventHubClient.

  • source (str) – The source EventHub from which to receive events.

  • prefetch (int) – The number of events to prefetch from the service for processing. Default is 300.

  • owner_level (int) – The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set.

  • track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

close()[source]

Close down the handler. If the handler has already closed, this will be a no op.

Example

Close down the handler.
client = EventHubClient.from_connection_string(connection_str)
consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'))
try:
    consumer.receive(timeout=1)
finally:
    # Close down the receive handler.
    consumer.close()
next()
receive(max_batch_size=None, timeout=None)[source]

Receive events from the EventHub.

Parameters
  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. If combined with a timeout and no events are retrieve before the time, the result will be empty. If no batch size is supplied, the prefetch size will be the maximum.

  • timeout (float) – The maximum wait time to build up the requested message count for the batch. If not specified, the default wait time specified when the consumer was created will be used.

Return type

list[EventData]

Raises

~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventHubError

Example

Receive events from the EventHub.
    with consumer:
        logger = logging.getLogger("azure.eventhub")
        received = consumer.receive(timeout=5, max_batch_size=1)
        for event_data in received:
            logger.info("Message received:{}".format(event_data.body_as_str()))
property last_enqueued_event_properties

The latest enqueued event information. This property will be updated each time an event is received when the receiver is created with track_last_enqueued_event_properties being True. The dict includes following information of the partition:

  • sequence_number

  • offset

  • enqueued_time

  • retrieval_time

Return type

dict or None

property queue_size

The current size of the unprocessed Event queue.

Return type

int

azure.eventhub.error module

exception azure.eventhub.error.AuthenticationError(message, details=None)[source]

Fail to connect to event hubs because of authentication problem

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.eventhub.error.ConnectError(message, details=None)[source]

Fail to connect to event hubs

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.eventhub.error.ConnectionLostError(message, details=None)[source]

Connection to event hub is lost. SDK will retry. So this shouldn’t happen.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.eventhub.error.EventDataError(message, details=None)[source]

Problematic event data so the send will fail at client side

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.eventhub.error.EventDataSendError(message, details=None)[source]

Service returns error while an event data is being sent

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.eventhub.error.EventHubError(message, details=None)[source]

Represents an error happened in the client.

Variables
  • message (str) – The error message.

  • error (str) – The error condition, if available.

  • details (dict[str, str]) – The error details, if included in the service response.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.eventhub.error.OperationTimeoutError(message, details=None)[source]

Operation times out

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args

azure.eventhub.producer module

class azure.eventhub.producer.EventHubProducer(client, target, **kwargs)[source]

A producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the options specified at creation, the producer may be created to allow event data to be automatically routed to an available partition or specific to a partition.

Please use the method create_producer on EventHubClient for creating EventHubProducer.

Instantiate an EventHubProducer. EventHubProducer should be instantiated by calling the create_producer method in EventHubClient.

Parameters
  • client () – The parent EventHubClient.

  • target (str) – The URI of the EventHub to send to.

  • partition (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

  • keep_alive (float) – The time interval in seconds between pinging the connection to keep it alive during periods of inactivity. The default value is None, i.e. no keep alive pings.

  • auto_reconnect (bool) – Whether to automatically reconnect the producer if a retryable error occurs. Default value is True.

close()[source]

Close down the handler. If the handler has already closed, this will be a no op.

Example

Close down the handler.
client = EventHubClient.from_connection_string(connection_str)
producer = client.create_producer(partition_id="0")
try:
    producer.send(EventData(b"A single event"))
finally:
    # Close down the send handler.
    producer.close()
create_batch(max_size=None, partition_key=None)[source]

Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side.

Parameters
  • max_size (int) – The maximum size of bytes data that an EventDataBatch object can hold.

  • partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service.

Returns

an EventDataBatch instance

Return type

EventDataBatch

Example

Create EventDataBatch object within limited size
    event_data_batch = producer.create_batch(max_size=10000)
    while True:
        try:
            event_data_batch.try_add(EventData('Message inside EventBatchData'))
        except ValueError:
            # The EventDataBatch object reaches its max_size.
            # You can send the full EventDataBatch object and create a new one here.
            break
send(event_data, partition_key=None, timeout=None)[source]

Sends an event data and blocks until acknowledgement is received or operation times out.

Parameters
  • event_data (EventData, Iterator, Generator, list) – The event to be sent. It can be an EventData object, or iterable of EventData objects

  • partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. partition_key could be omitted if event_data is of type ~azure.eventhub.EventDataBatch.

  • timeout (float) – The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used.

Raises

~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError

Returns

None

Return type

None

Example

Sends an event data and blocks until acknowledgement is received or operation times out.
    with producer:
        event_data = EventData(b"A single event")
        producer.send(event_data)

Module contents

exception azure.eventhub.EventHubError(message, details=None)[source]

Represents an error happened in the client.

Variables
  • message (str) – The error message.

  • error (str) – The error condition, if available.

  • details (dict[str, str]) – The error details, if included in the service response.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.eventhub.ConnectError(message, details=None)[source]

Fail to connect to event hubs

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.eventhub.ConnectionLostError(message, details=None)[source]

Connection to event hub is lost. SDK will retry. So this shouldn’t happen.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.eventhub.EventDataError(message, details=None)[source]

Problematic event data so the send will fail at client side

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.eventhub.EventDataSendError(message, details=None)[source]

Service returns error while an event data is being sent

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.eventhub.AuthenticationError(message, details=None)[source]

Fail to connect to event hubs because of authentication problem

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
class azure.eventhub.EventData(body=None)[source]

The EventData class is a holder of event content.

Example

Create instances of EventData
    event_data = EventData("String data")
    event_data = EventData(b"Bytes data")
    event_data = EventData([b"A", b"B", b"C"])

    list_data = ['Message {}'.format(i) for i in range(10)]
    event_data = EventData(body=list_data)

Initialize EventData.

Parameters

body (str, bytes or list) – The data to send in a single message.

body_as_json(encoding='UTF-8')[source]

The body of the event loaded as a JSON object is the data is compatible.

Parameters

encoding – The encoding to use for decoding message data. Default is ‘UTF-8’

Return type

dict

body_as_str(encoding='UTF-8')[source]

The body of the event data as a string if the data is of a compatible type.

Parameters

encoding – The encoding to use for decoding message data. Default is ‘UTF-8’

Return type

str or unicode

encode_message()[source]
PROP_LAST_ENQUEUED_OFFSET = b'last_enqueued_offset'
PROP_LAST_ENQUEUED_SEQUENCE_NUMBER = b'last_enqueued_sequence_number'
PROP_LAST_ENQUEUED_TIME_UTC = b'last_enqueued_time_utc'
PROP_OFFSET = b'x-opt-offset'
PROP_PARTITION_KEY = b'x-opt-partition-key'
PROP_PARTITION_KEY_AMQP_SYMBOL = <uamqp.types.AMQPSymbol object>
PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC = b'runtime_info_retrieval_time_utc'
PROP_SEQ_NUMBER = b'x-opt-sequence-number'
PROP_TIMESTAMP = b'x-opt-enqueued-time'
property application_properties

Application defined properties on the message.

Return type

dict

property body

The body of the event data object.

Return type

bytes or Generator[bytes]

property enqueued_time

The enqueued timestamp of the event data object.

Return type

datetime.datetime

property offset

The offset of the event data object.

Return type

str

property partition_key

The partition key of the event data object.

Return type

bytes

property sequence_number

The sequence number of the event data object.

Return type

int or long

property system_properties

Metadata set by the Event Hubs Service associated with the EventData

Return type

dict

class azure.eventhub.EventDataBatch(max_size=None, partition_key=None)[source]

Sending events in batch get better performance than sending individual events. EventDataBatch helps you create the maximum allowed size batch of EventData to improve sending performance.

Use try_add method to add events until the maximum batch size limit in bytes has been reached - a ValueError will be raised. Use send method of ~azure.eventhub.EventHubProducer or ~azure.eventhub.aio.EventHubProducer for sending.

Please use the create_batch method of EventHubProducer to create an EventDataBatch object instead of instantiating an EventDataBatch object directly.

try_add(event_data)[source]

The message size is a sum up of body, properties, header, etc. :param event_data: ~azure.eventhub.EventData :return: None :raise: ValueError, when exceeding the size limit.

property size

The size in bytes

Returns

int

class azure.eventhub.EventPosition(value, inclusive=False)[source]

The position(offset, sequence or timestamp) where a consumer starts. Examples:

Beginning of the event stream:
>>> event_pos = EventPosition("-1")
End of the event stream:
>>> event_pos = EventPosition("@latest")
Events after the specified offset:
>>> event_pos = EventPosition("12345")
Events from the specified offset:
>>> event_pos = EventPosition("12345", True)
Events after a datetime:
>>> event_pos = EventPosition(datetime.datetime.utcnow())
Events after a specific sequence number:
>>> event_pos = EventPosition(1506968696002)

Initialize EventPosition.

Parameters
  • value (datetime or int or str) – The event position value.

  • inclusive (bool) – Whether to include the supplied value as the start point.

class azure.eventhub.EventHubClient(host, event_hub_path, credential, **kwargs)[source]

The EventHubClient class defines a high level interface for sending events to and receiving events from the Azure Event Hubs service.

Example

Create a new instance of the Event Hub client
import os
from azure.eventhub import EventHubClient, EventHubSharedKeyCredential

host = os.environ['EVENT_HUB_HOSTNAME']
event_hub_path = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

client = EventHubClient(
    host=host,
    event_hub_path=event_hub_path,
    credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
)

Constructs a new EventHubClient.

Parameters
  • host (str) – The hostname of the Event Hub.

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.

  • http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.

  • receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

close()[source]
create_consumer(consumer_group, partition_id, event_position, **kwargs)[source]

Create a consumer to the client for a particular consumer group and partition.

Parameters
  • consumer_group (str) – The name of the consumer group this consumer is associated with. Events are read in the context of this group. The default consumer_group for an event hub is “$Default”.

  • partition_id (str) – The identifier of the Event Hub partition from which events will be received.

  • event_position (EventPosition) – The position within the partition where the consumer should begin reading events.

  • owner_level (int) – The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

Return type

EventHubConsumer

Example

Add a consumer to the client for a particular consumer group and partition.
client = EventHubClient.from_connection_string(connection_str)
# Create a consumer.
consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'))
# Create an exclusive consumer object.
exclusive_consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=1)
create_producer(partition_id=None, send_timeout=None)[source]

Create an producer to send EventData object to an EventHub.

Parameters
  • partition_id (str) – Optionally specify a particular partition to send to. If omitted, the events will be distributed to available partitions via round-robin.

  • operation (str) – An optional operation to be appended to the hostname in the target URL. The value must start with / character.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

Return type

EventHubProducer

Example

Add a producer to the client to send EventData.
client = EventHubClient.from_connection_string(connection_str)
# Create a producer.
producer = client.create_producer(partition_id="0")
classmethod from_connection_string(conn_str, **kwargs)

Create an EventHubClient from an EventHub connection string.

Parameters
  • conn_str (str) – The connection string of an eventhub

  • event_hub_path (str) – The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string.

  • network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.

  • http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

  • user_agent (str) – The user agent that needs to be appended to the built in user agent string.

  • retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.

  • prefetch (int) – The message prefetch count of the consumer. Default is 300.

  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.

  • receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds, meaning there is no timeout.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

Example

Create an EventHubClient from a connection string.
import os
from azure.eventhub import EventHubClient

connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
    os.environ['EVENT_HUB_HOSTNAME'],
    os.environ['EVENT_HUB_SAS_POLICY'],
    os.environ['EVENT_HUB_SAS_KEY'],
    os.environ['EVENT_HUB_NAME'])
client = EventHubClient.from_connection_string(connection_str)
get_partition_ids()[source]

Get partition ids of the specified EventHub.

Return type

list[str]

Raises

~azure.eventhub.EventHubError

get_partition_properties(partition)[source]

Get properties of the specified partition. Keys in the details dictionary include:

-‘event_hub_path’ -‘id’ -‘beginning_sequence_number’ -‘last_enqueued_sequence_number’ -‘last_enqueued_offset’ -‘last_enqueued_time_utc’ -‘is_empty’

Parameters

partition (str) – The target partition id.

Return type

dict

Raises

~azure.eventhub.ConnectError

get_properties()[source]

Get properties of the specified EventHub. Keys in the details dictionary include:

-‘path’ -‘created_at’ -‘partition_ids’

Return type

dict

Raises

~azure.eventhub.EventHubError

class azure.eventhub.EventHubProducer(client, target, **kwargs)[source]

A producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the options specified at creation, the producer may be created to allow event data to be automatically routed to an available partition or specific to a partition.

Please use the method create_producer on EventHubClient for creating EventHubProducer.

Instantiate an EventHubProducer. EventHubProducer should be instantiated by calling the create_producer method in EventHubClient.

Parameters
  • client () – The parent EventHubClient.

  • target (str) – The URI of the EventHub to send to.

  • partition (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin.

  • send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.

  • keep_alive (float) – The time interval in seconds between pinging the connection to keep it alive during periods of inactivity. The default value is None, i.e. no keep alive pings.

  • auto_reconnect (bool) – Whether to automatically reconnect the producer if a retryable error occurs. Default value is True.

close()[source]

Close down the handler. If the handler has already closed, this will be a no op.

Example

Close down the handler.
client = EventHubClient.from_connection_string(connection_str)
producer = client.create_producer(partition_id="0")
try:
    producer.send(EventData(b"A single event"))
finally:
    # Close down the send handler.
    producer.close()
create_batch(max_size=None, partition_key=None)[source]

Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side.

Parameters
  • max_size (int) – The maximum size of bytes data that an EventDataBatch object can hold.

  • partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service.

Returns

an EventDataBatch instance

Return type

EventDataBatch

Example

Create EventDataBatch object within limited size
    event_data_batch = producer.create_batch(max_size=10000)
    while True:
        try:
            event_data_batch.try_add(EventData('Message inside EventBatchData'))
        except ValueError:
            # The EventDataBatch object reaches its max_size.
            # You can send the full EventDataBatch object and create a new one here.
            break
send(event_data, partition_key=None, timeout=None)[source]

Sends an event data and blocks until acknowledgement is received or operation times out.

Parameters
  • event_data (EventData, Iterator, Generator, list) – The event to be sent. It can be an EventData object, or iterable of EventData objects

  • partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. partition_key could be omitted if event_data is of type ~azure.eventhub.EventDataBatch.

  • timeout (float) – The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used.

Raises

~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError

Returns

None

Return type

None

Example

Sends an event data and blocks until acknowledgement is received or operation times out.
    with producer:
        event_data = EventData(b"A single event")
        producer.send(event_data)
class azure.eventhub.EventHubConsumer(client, source, **kwargs)[source]

A consumer responsible for reading EventData from a specific Event Hub partition and as a member of a specific consumer group.

A consumer may be exclusive, which asserts ownership over the partition for the consumer group to ensure that only one consumer from that group is reading the from the partition. These exclusive consumers are sometimes referred to as “Epoch Consumers.”

A consumer may also be non-exclusive, allowing multiple consumers from the same consumer group to be actively reading events from the partition. These non-exclusive consumers are sometimes referred to as “Non-Epoch Consumers.”

Please use the method create_consumer on EventHubClient for creating EventHubConsumer.

Instantiate a consumer. EventHubConsumer should be instantiated by calling the create_consumer method in EventHubClient.

Parameters
  • client (EventHubClient) – The parent EventHubClient.

  • source (str) – The source EventHub from which to receive events.

  • prefetch (int) – The number of events to prefetch from the service for processing. Default is 300.

  • owner_level (int) – The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set.

  • track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

close()[source]

Close down the handler. If the handler has already closed, this will be a no op.

Example

Close down the handler.
client = EventHubClient.from_connection_string(connection_str)
consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'))
try:
    consumer.receive(timeout=1)
finally:
    # Close down the receive handler.
    consumer.close()
next()
receive(max_batch_size=None, timeout=None)[source]

Receive events from the EventHub.

Parameters
  • max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. If combined with a timeout and no events are retrieve before the time, the result will be empty. If no batch size is supplied, the prefetch size will be the maximum.

  • timeout (float) – The maximum wait time to build up the requested message count for the batch. If not specified, the default wait time specified when the consumer was created will be used.

Return type

list[EventData]

Raises

~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventHubError

Example

Receive events from the EventHub.
    with consumer:
        logger = logging.getLogger("azure.eventhub")
        received = consumer.receive(timeout=5, max_batch_size=1)
        for event_data in received:
            logger.info("Message received:{}".format(event_data.body_as_str()))
property last_enqueued_event_properties

The latest enqueued event information. This property will be updated each time an event is received when the receiver is created with track_last_enqueued_event_properties being True. The dict includes following information of the partition:

  • sequence_number

  • offset

  • enqueued_time

  • retrieval_time

Return type

dict or None

property queue_size

The current size of the unprocessed Event queue.

Return type

int

class azure.eventhub.TransportType[source]

Transport type The underlying transport protocol type:

Amqp: AMQP over the default TCP transport protocol, it uses port 5671. AmqpOverWebsocket: Amqp over the Web Sockets transport protocol, it uses port 443.

Amqp = 1
AmqpOverWebsocket = 2
class azure.eventhub.EventHubSharedKeyCredential(policy, key)[source]

The shared access key credential used for authentication.

Parameters
  • policy (str) – The name of the shared access policy.

  • key (str) – The shared access key.

class azure.eventhub.EventHubSASTokenCredential(token)[source]

SAS token used for authentication.

Parameters

token (str or callable) – A SAS token or function that returns a SAS token. If a function is supplied, it will be used to retrieve subsequent tokens in the case of token expiry. The function should take no arguments.

get_sas_token()[source]