azure.eventhub package¶
Subpackages¶
- azure.eventhub.aio package
- Subpackages
- azure.eventhub.aio.eventprocessor package
- Submodules
- azure.eventhub.aio.eventprocessor.event_processor module
- azure.eventhub.aio.eventprocessor.partition_context module
- azure.eventhub.aio.eventprocessor.partition_manager module
- azure.eventhub.aio.eventprocessor.partition_processor module
- azure.eventhub.aio.eventprocessor.sample_partition_manager module
- azure.eventhub.aio.eventprocessor.utils module
- Module contents
- azure.eventhub.aio.eventprocessor package
- Submodules
- azure.eventhub.aio.client_async module
- azure.eventhub.aio.consumer_async module
- azure.eventhub.aio.error_async module
- azure.eventhub.aio.producer_async module
- Module contents
- Subpackages
- azure.eventhub.extensions package
Submodules¶
azure.eventhub.client module¶
-
class
azure.eventhub.client.
EventHubClient
(host, event_hub_path, credential, **kwargs)[source]¶ The EventHubClient class defines a high level interface for sending events to and receiving events from the Azure Event Hubs service.
Example
import os from azure.eventhub import EventHubClient, EventHubSharedKeyCredential host = os.environ['EVENT_HUB_HOSTNAME'] event_hub_path = os.environ['EVENT_HUB_NAME'] shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY'] shared_access_key = os.environ['EVENT_HUB_SAS_KEY'] client = EventHubClient( host=host, event_hub_path=event_hub_path, credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key) )
Constructs a new EventHubClient.
- Parameters
host (str) – The hostname of the Event Hub.
event_hub_path (str) – The path of the specific Event Hub to connect the client to.
network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.
credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.
http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – The user agent that needs to be appended to the built in user agent string.
retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.
prefetch (int) – The message prefetch count of the consumer. Default is 300.
max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.
receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds.
send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.
-
create_consumer
(consumer_group, partition_id, event_position, **kwargs)[source]¶ Create a consumer to the client for a particular consumer group and partition.
- Parameters
consumer_group (str) – The name of the consumer group this consumer is associated with. Events are read in the context of this group. The default consumer_group for an event hub is “$Default”.
partition_id (str) – The identifier of the Event Hub partition from which events will be received.
event_position (EventPosition) – The position within the partition where the consumer should begin reading events.
owner_level (int) – The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set.
prefetch (int) – The message prefetch count of the consumer. Default is 300.
track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.
- Return type
Example
client = EventHubClient.from_connection_string(connection_str) # Create a consumer. consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) # Create an exclusive consumer object. exclusive_consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=1)
-
create_producer
(partition_id=None, send_timeout=None)[source]¶ Create an producer to send EventData object to an EventHub.
- Parameters
partition_id (str) – Optionally specify a particular partition to send to. If omitted, the events will be distributed to available partitions via round-robin.
operation (str) – An optional operation to be appended to the hostname in the target URL. The value must start with / character.
send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.
- Return type
Example
client = EventHubClient.from_connection_string(connection_str) # Create a producer. producer = client.create_producer(partition_id="0")
-
classmethod
from_connection_string
(conn_str, **kwargs)¶ Create an EventHubClient from an EventHub connection string.
- Parameters
conn_str (str) – The connection string of an eventhub
event_hub_path (str) – The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string.
network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.
http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – The user agent that needs to be appended to the built in user agent string.
retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.
prefetch (int) – The message prefetch count of the consumer. Default is 300.
max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.
receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds, meaning there is no timeout.
send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.
Example
import os from azure.eventhub import EventHubClient connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( os.environ['EVENT_HUB_HOSTNAME'], os.environ['EVENT_HUB_SAS_POLICY'], os.environ['EVENT_HUB_SAS_KEY'], os.environ['EVENT_HUB_NAME']) client = EventHubClient.from_connection_string(connection_str)
azure.eventhub.client_abstract module¶
-
class
azure.eventhub.client_abstract.
EventHubClientAbstract
(host, event_hub_path, credential, **kwargs)[source]¶ The EventHubClientAbstract class defines a high level interface for sending events to and receiving events from the Azure Event Hubs service.
Constructs a new EventHubClient.
- Parameters
host (str) – The hostname of the Event Hub.
event_hub_path (str) – The path of the specific Event Hub to connect the client to.
network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.
credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.
http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – The user agent that needs to be appended to the built in user agent string.
retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.
prefetch (int) – The message prefetch count of the consumer. Default is 300.
max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.
receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds.
send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.
-
classmethod
from_connection_string
(conn_str, **kwargs)[source]¶ Create an EventHubClient from an EventHub connection string.
- Parameters
conn_str (str) – The connection string of an eventhub
event_hub_path (str) – The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string.
network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.
http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – The user agent that needs to be appended to the built in user agent string.
retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.
prefetch (int) – The message prefetch count of the consumer. Default is 300.
max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.
receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds, meaning there is no timeout.
send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.
Example
import os from azure.eventhub import EventHubClient connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( os.environ['EVENT_HUB_HOSTNAME'], os.environ['EVENT_HUB_SAS_POLICY'], os.environ['EVENT_HUB_SAS_KEY'], os.environ['EVENT_HUB_NAME']) client = EventHubClient.from_connection_string(connection_str)
azure.eventhub.common module¶
-
class
azure.eventhub.common.
EventData
(body=None)[source]¶ The EventData class is a holder of event content.
Example
event_data = EventData("String data") event_data = EventData(b"Bytes data") event_data = EventData([b"A", b"B", b"C"]) list_data = ['Message {}'.format(i) for i in range(10)] event_data = EventData(body=list_data)
Initialize EventData.
-
body_as_json
(encoding='UTF-8')[source]¶ The body of the event loaded as a JSON object is the data is compatible.
- Parameters
encoding – The encoding to use for decoding message data. Default is ‘UTF-8’
- Return type
-
body_as_str
(encoding='UTF-8')[source]¶ The body of the event data as a string if the data is of a compatible type.
- Parameters
encoding – The encoding to use for decoding message data. Default is ‘UTF-8’
- Return type
str or unicode
-
PROP_LAST_ENQUEUED_OFFSET
= b'last_enqueued_offset'¶
-
PROP_LAST_ENQUEUED_SEQUENCE_NUMBER
= b'last_enqueued_sequence_number'¶
-
PROP_LAST_ENQUEUED_TIME_UTC
= b'last_enqueued_time_utc'¶
-
PROP_OFFSET
= b'x-opt-offset'¶
-
PROP_PARTITION_KEY
= b'x-opt-partition-key'¶
-
PROP_PARTITION_KEY_AMQP_SYMBOL
= <uamqp.types.AMQPSymbol object>¶
-
PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC
= b'runtime_info_retrieval_time_utc'¶
-
PROP_SEQ_NUMBER
= b'x-opt-sequence-number'¶
-
PROP_TIMESTAMP
= b'x-opt-enqueued-time'¶
-
property
enqueued_time
¶ The enqueued timestamp of the event data object.
- Return type
-
-
class
azure.eventhub.common.
EventDataBatch
(max_size=None, partition_key=None)[source]¶ Sending events in batch get better performance than sending individual events. EventDataBatch helps you create the maximum allowed size batch of EventData to improve sending performance.
Use try_add method to add events until the maximum batch size limit in bytes has been reached - a ValueError will be raised. Use send method of ~azure.eventhub.EventHubProducer or ~azure.eventhub.aio.EventHubProducer for sending.
Please use the create_batch method of EventHubProducer to create an EventDataBatch object instead of instantiating an EventDataBatch object directly.
-
try_add
(event_data)[source]¶ The message size is a sum up of body, properties, header, etc. :param event_data: ~azure.eventhub.EventData :return: None :raise: ValueError, when exceeding the size limit.
-
property
size
¶ The size in bytes
- Returns
int
-
-
class
azure.eventhub.common.
EventHubSASTokenCredential
(token)[source]¶ SAS token used for authentication.
- Parameters
token (str or callable) – A SAS token or function that returns a SAS token. If a function is supplied, it will be used to retrieve subsequent tokens in the case of token expiry. The function should take no arguments.
The shared access key credential used for authentication.
-
class
azure.eventhub.common.
EventPosition
(value, inclusive=False)[source]¶ The position(offset, sequence or timestamp) where a consumer starts. Examples:
- Beginning of the event stream:
>>> event_pos = EventPosition("-1")
- End of the event stream:
>>> event_pos = EventPosition("@latest")
- Events after the specified offset:
>>> event_pos = EventPosition("12345")
- Events from the specified offset:
>>> event_pos = EventPosition("12345", True)
- Events after a datetime:
>>> event_pos = EventPosition(datetime.datetime.utcnow())
- Events after a specific sequence number:
>>> event_pos = EventPosition(1506968696002)
Initialize EventPosition.
azure.eventhub.configuration module¶
azure.eventhub.consumer module¶
-
class
azure.eventhub.consumer.
EventHubConsumer
(client, source, **kwargs)[source]¶ A consumer responsible for reading EventData from a specific Event Hub partition and as a member of a specific consumer group.
A consumer may be exclusive, which asserts ownership over the partition for the consumer group to ensure that only one consumer from that group is reading the from the partition. These exclusive consumers are sometimes referred to as “Epoch Consumers.”
A consumer may also be non-exclusive, allowing multiple consumers from the same consumer group to be actively reading events from the partition. These non-exclusive consumers are sometimes referred to as “Non-Epoch Consumers.”
Please use the method create_consumer on EventHubClient for creating EventHubConsumer.
Instantiate a consumer. EventHubConsumer should be instantiated by calling the create_consumer method in EventHubClient.
- Parameters
client (EventHubClient) – The parent EventHubClient.
source (str) – The source EventHub from which to receive events.
prefetch (int) – The number of events to prefetch from the service for processing. Default is 300.
owner_level (int) – The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set.
track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.
-
close
()[source]¶ Close down the handler. If the handler has already closed, this will be a no op.
Example
client = EventHubClient.from_connection_string(connection_str) consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) try: consumer.receive(timeout=1) finally: # Close down the receive handler. consumer.close()
-
next
()¶
-
receive
(max_batch_size=None, timeout=None)[source]¶ Receive events from the EventHub.
- Parameters
max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. If combined with a timeout and no events are retrieve before the time, the result will be empty. If no batch size is supplied, the prefetch size will be the maximum.
timeout (float) – The maximum wait time to build up the requested message count for the batch. If not specified, the default wait time specified when the consumer was created will be used.
- Return type
- Raises
~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventHubError
Example
with consumer: logger = logging.getLogger("azure.eventhub") received = consumer.receive(timeout=5, max_batch_size=1) for event_data in received: logger.info("Message received:{}".format(event_data.body_as_str()))
-
property
last_enqueued_event_properties
¶ The latest enqueued event information. This property will be updated each time an event is received when the receiver is created with track_last_enqueued_event_properties being True. The dict includes following information of the partition:
sequence_number
offset
enqueued_time
retrieval_time
azure.eventhub.error module¶
-
exception
azure.eventhub.error.
AuthenticationError
(message, details=None)[source]¶ Fail to connect to event hubs because of authentication problem
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.eventhub.error.
ConnectError
(message, details=None)[source]¶ Fail to connect to event hubs
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.eventhub.error.
ConnectionLostError
(message, details=None)[source]¶ Connection to event hub is lost. SDK will retry. So this shouldn’t happen.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.eventhub.error.
EventDataError
(message, details=None)[source]¶ Problematic event data so the send will fail at client side
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.eventhub.error.
EventDataSendError
(message, details=None)[source]¶ Service returns error while an event data is being sent
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
azure.eventhub.producer module¶
-
class
azure.eventhub.producer.
EventHubProducer
(client, target, **kwargs)[source]¶ A producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the options specified at creation, the producer may be created to allow event data to be automatically routed to an available partition or specific to a partition.
Please use the method create_producer on EventHubClient for creating EventHubProducer.
Instantiate an EventHubProducer. EventHubProducer should be instantiated by calling the create_producer method in EventHubClient.
- Parameters
client () – The parent EventHubClient.
target (str) – The URI of the EventHub to send to.
partition (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin.
send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.
keep_alive (float) – The time interval in seconds between pinging the connection to keep it alive during periods of inactivity. The default value is None, i.e. no keep alive pings.
auto_reconnect (bool) – Whether to automatically reconnect the producer if a retryable error occurs. Default value is True.
-
close
()[source]¶ Close down the handler. If the handler has already closed, this will be a no op.
Example
client = EventHubClient.from_connection_string(connection_str) producer = client.create_producer(partition_id="0") try: producer.send(EventData(b"A single event")) finally: # Close down the send handler. producer.close()
-
create_batch
(max_size=None, partition_key=None)[source]¶ Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side.
- Parameters
- Returns
an EventDataBatch instance
- Return type
Example
event_data_batch = producer.create_batch(max_size=10000) while True: try: event_data_batch.try_add(EventData('Message inside EventBatchData')) except ValueError: # The EventDataBatch object reaches its max_size. # You can send the full EventDataBatch object and create a new one here. break
-
send
(event_data, partition_key=None, timeout=None)[source]¶ Sends an event data and blocks until acknowledgement is received or operation times out.
- Parameters
event_data (EventData, Iterator, Generator, list) – The event to be sent. It can be an EventData object, or iterable of EventData objects
partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. partition_key could be omitted if event_data is of type ~azure.eventhub.EventDataBatch.
timeout (float) – The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used.
- Raises
~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError
- Returns
None
- Return type
Example
with producer: event_data = EventData(b"A single event") producer.send(event_data)
Module contents¶
-
exception
azure.eventhub.
EventHubError
(message, details=None)[source]¶ Represents an error happened in the client.
- Variables
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
exception
azure.eventhub.
ConnectError
(message, details=None)[source]¶ Fail to connect to event hubs
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.eventhub.
ConnectionLostError
(message, details=None)[source]¶ Connection to event hub is lost. SDK will retry. So this shouldn’t happen.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.eventhub.
EventDataError
(message, details=None)[source]¶ Problematic event data so the send will fail at client side
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.eventhub.
EventDataSendError
(message, details=None)[source]¶ Service returns error while an event data is being sent
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.eventhub.
AuthenticationError
(message, details=None)[source]¶ Fail to connect to event hubs because of authentication problem
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
class
azure.eventhub.
EventData
(body=None)[source]¶ The EventData class is a holder of event content.
Example
event_data = EventData("String data") event_data = EventData(b"Bytes data") event_data = EventData([b"A", b"B", b"C"]) list_data = ['Message {}'.format(i) for i in range(10)] event_data = EventData(body=list_data)
Initialize EventData.
-
body_as_json
(encoding='UTF-8')[source]¶ The body of the event loaded as a JSON object is the data is compatible.
- Parameters
encoding – The encoding to use for decoding message data. Default is ‘UTF-8’
- Return type
-
body_as_str
(encoding='UTF-8')[source]¶ The body of the event data as a string if the data is of a compatible type.
- Parameters
encoding – The encoding to use for decoding message data. Default is ‘UTF-8’
- Return type
str or unicode
-
PROP_LAST_ENQUEUED_OFFSET
= b'last_enqueued_offset'¶
-
PROP_LAST_ENQUEUED_SEQUENCE_NUMBER
= b'last_enqueued_sequence_number'¶
-
PROP_LAST_ENQUEUED_TIME_UTC
= b'last_enqueued_time_utc'¶
-
PROP_OFFSET
= b'x-opt-offset'¶
-
PROP_PARTITION_KEY
= b'x-opt-partition-key'¶
-
PROP_PARTITION_KEY_AMQP_SYMBOL
= <uamqp.types.AMQPSymbol object>¶
-
PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC
= b'runtime_info_retrieval_time_utc'¶
-
PROP_SEQ_NUMBER
= b'x-opt-sequence-number'¶
-
PROP_TIMESTAMP
= b'x-opt-enqueued-time'¶
-
property
enqueued_time
¶ The enqueued timestamp of the event data object.
- Return type
-
-
class
azure.eventhub.
EventDataBatch
(max_size=None, partition_key=None)[source]¶ Sending events in batch get better performance than sending individual events. EventDataBatch helps you create the maximum allowed size batch of EventData to improve sending performance.
Use try_add method to add events until the maximum batch size limit in bytes has been reached - a ValueError will be raised. Use send method of ~azure.eventhub.EventHubProducer or ~azure.eventhub.aio.EventHubProducer for sending.
Please use the create_batch method of EventHubProducer to create an EventDataBatch object instead of instantiating an EventDataBatch object directly.
-
try_add
(event_data)[source]¶ The message size is a sum up of body, properties, header, etc. :param event_data: ~azure.eventhub.EventData :return: None :raise: ValueError, when exceeding the size limit.
-
property
size
¶ The size in bytes
- Returns
int
-
-
class
azure.eventhub.
EventPosition
(value, inclusive=False)[source]¶ The position(offset, sequence or timestamp) where a consumer starts. Examples:
- Beginning of the event stream:
>>> event_pos = EventPosition("-1")
- End of the event stream:
>>> event_pos = EventPosition("@latest")
- Events after the specified offset:
>>> event_pos = EventPosition("12345")
- Events from the specified offset:
>>> event_pos = EventPosition("12345", True)
- Events after a datetime:
>>> event_pos = EventPosition(datetime.datetime.utcnow())
- Events after a specific sequence number:
>>> event_pos = EventPosition(1506968696002)
Initialize EventPosition.
-
class
azure.eventhub.
EventHubClient
(host, event_hub_path, credential, **kwargs)[source]¶ The EventHubClient class defines a high level interface for sending events to and receiving events from the Azure Event Hubs service.
Example
import os from azure.eventhub import EventHubClient, EventHubSharedKeyCredential host = os.environ['EVENT_HUB_HOSTNAME'] event_hub_path = os.environ['EVENT_HUB_NAME'] shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY'] shared_access_key = os.environ['EVENT_HUB_SAS_KEY'] client = EventHubClient( host=host, event_hub_path=event_hub_path, credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key) )
Constructs a new EventHubClient.
- Parameters
host (str) – The hostname of the Event Hub.
event_hub_path (str) – The path of the specific Event Hub to connect the client to.
network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.
credential – The credential object used for authentication which implements particular interface of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and objects that implement get_token(self, *scopes) method.
http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – The user agent that needs to be appended to the built in user agent string.
retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.
prefetch (int) – The message prefetch count of the consumer. Default is 300.
max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.
receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds.
send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.
-
create_consumer
(consumer_group, partition_id, event_position, **kwargs)[source]¶ Create a consumer to the client for a particular consumer group and partition.
- Parameters
consumer_group (str) – The name of the consumer group this consumer is associated with. Events are read in the context of this group. The default consumer_group for an event hub is “$Default”.
partition_id (str) – The identifier of the Event Hub partition from which events will be received.
event_position (EventPosition) – The position within the partition where the consumer should begin reading events.
owner_level (int) – The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set.
prefetch (int) – The message prefetch count of the consumer. Default is 300.
track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.
- Return type
Example
client = EventHubClient.from_connection_string(connection_str) # Create a consumer. consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) # Create an exclusive consumer object. exclusive_consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=1)
-
create_producer
(partition_id=None, send_timeout=None)[source]¶ Create an producer to send EventData object to an EventHub.
- Parameters
partition_id (str) – Optionally specify a particular partition to send to. If omitted, the events will be distributed to available partitions via round-robin.
operation (str) – An optional operation to be appended to the hostname in the target URL. The value must start with / character.
send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.
- Return type
Example
client = EventHubClient.from_connection_string(connection_str) # Create a producer. producer = client.create_producer(partition_id="0")
-
classmethod
from_connection_string
(conn_str, **kwargs)¶ Create an EventHubClient from an EventHub connection string.
- Parameters
conn_str (str) – The connection string of an eventhub
event_hub_path (str) – The path of the specific Event Hub to connect the client to, if the EntityName is not included in the connection string.
network_tracing (bool) – Whether to output network trace logs to the logger. Default is False.
http_proxy (dict[str, Any]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – The user agent that needs to be appended to the built in user agent string.
retry_total (int) – The total number of attempts to redo the failed operation when an error happened. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.
prefetch (int) – The message prefetch count of the consumer. Default is 300.
max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch.
receive_timeout (float) – The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds, meaning there is no timeout.
send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.
Example
import os from azure.eventhub import EventHubClient connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( os.environ['EVENT_HUB_HOSTNAME'], os.environ['EVENT_HUB_SAS_POLICY'], os.environ['EVENT_HUB_SAS_KEY'], os.environ['EVENT_HUB_NAME']) client = EventHubClient.from_connection_string(connection_str)
-
class
azure.eventhub.
EventHubProducer
(client, target, **kwargs)[source]¶ A producer responsible for transmitting EventData to a specific Event Hub, grouped together in batches. Depending on the options specified at creation, the producer may be created to allow event data to be automatically routed to an available partition or specific to a partition.
Please use the method create_producer on EventHubClient for creating EventHubProducer.
Instantiate an EventHubProducer. EventHubProducer should be instantiated by calling the create_producer method in EventHubClient.
- Parameters
client () – The parent EventHubClient.
target (str) – The URI of the EventHub to send to.
partition (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin.
send_timeout (float) – The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout.
keep_alive (float) – The time interval in seconds between pinging the connection to keep it alive during periods of inactivity. The default value is None, i.e. no keep alive pings.
auto_reconnect (bool) – Whether to automatically reconnect the producer if a retryable error occurs. Default value is True.
-
close
()[source]¶ Close down the handler. If the handler has already closed, this will be a no op.
Example
client = EventHubClient.from_connection_string(connection_str) producer = client.create_producer(partition_id="0") try: producer.send(EventData(b"A single event")) finally: # Close down the send handler. producer.close()
-
create_batch
(max_size=None, partition_key=None)[source]¶ Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side.
- Parameters
- Returns
an EventDataBatch instance
- Return type
Example
event_data_batch = producer.create_batch(max_size=10000) while True: try: event_data_batch.try_add(EventData('Message inside EventBatchData')) except ValueError: # The EventDataBatch object reaches its max_size. # You can send the full EventDataBatch object and create a new one here. break
-
send
(event_data, partition_key=None, timeout=None)[source]¶ Sends an event data and blocks until acknowledgement is received or operation times out.
- Parameters
event_data (EventData, Iterator, Generator, list) – The event to be sent. It can be an EventData object, or iterable of EventData objects
partition_key (str) – With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. partition_key could be omitted if event_data is of type ~azure.eventhub.EventDataBatch.
timeout (float) – The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used.
- Raises
~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError
- Returns
None
- Return type
Example
with producer: event_data = EventData(b"A single event") producer.send(event_data)
-
class
azure.eventhub.
EventHubConsumer
(client, source, **kwargs)[source]¶ A consumer responsible for reading EventData from a specific Event Hub partition and as a member of a specific consumer group.
A consumer may be exclusive, which asserts ownership over the partition for the consumer group to ensure that only one consumer from that group is reading the from the partition. These exclusive consumers are sometimes referred to as “Epoch Consumers.”
A consumer may also be non-exclusive, allowing multiple consumers from the same consumer group to be actively reading events from the partition. These non-exclusive consumers are sometimes referred to as “Non-Epoch Consumers.”
Please use the method create_consumer on EventHubClient for creating EventHubConsumer.
Instantiate a consumer. EventHubConsumer should be instantiated by calling the create_consumer method in EventHubClient.
- Parameters
client (EventHubClient) – The parent EventHubClient.
source (str) – The source EventHub from which to receive events.
prefetch (int) – The number of events to prefetch from the service for processing. Default is 300.
owner_level (int) – The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set.
track_last_enqueued_event_properties (bool) – Indicates whether or not the consumer should request information on the last enqueued event on its associated partition, and track that information as events are received. When information about the partition’s last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.
-
close
()[source]¶ Close down the handler. If the handler has already closed, this will be a no op.
Example
client = EventHubClient.from_connection_string(connection_str) consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) try: consumer.receive(timeout=1) finally: # Close down the receive handler. consumer.close()
-
next
()¶
-
receive
(max_batch_size=None, timeout=None)[source]¶ Receive events from the EventHub.
- Parameters
max_batch_size (int) – Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. If combined with a timeout and no events are retrieve before the time, the result will be empty. If no batch size is supplied, the prefetch size will be the maximum.
timeout (float) – The maximum wait time to build up the requested message count for the batch. If not specified, the default wait time specified when the consumer was created will be used.
- Return type
- Raises
~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventHubError
Example
with consumer: logger = logging.getLogger("azure.eventhub") received = consumer.receive(timeout=5, max_batch_size=1) for event_data in received: logger.info("Message received:{}".format(event_data.body_as_str()))
-
property
last_enqueued_event_properties
¶ The latest enqueued event information. This property will be updated each time an event is received when the receiver is created with track_last_enqueued_event_properties being True. The dict includes following information of the partition:
sequence_number
offset
enqueued_time
retrieval_time
-
class
azure.eventhub.
TransportType
[source]¶ Transport type The underlying transport protocol type:
Amqp: AMQP over the default TCP transport protocol, it uses port 5671. AmqpOverWebsocket: Amqp over the Web Sockets transport protocol, it uses port 443.
-
Amqp
= 1¶
-
AmqpOverWebsocket
= 2¶
-
The shared access key credential used for authentication.
-
class
azure.eventhub.
EventHubSASTokenCredential
(token)[source]¶ SAS token used for authentication.
- Parameters
token (str or callable) – A SAS token or function that returns a SAS token. If a function is supplied, it will be used to retrieve subsequent tokens in the case of token expiry. The function should take no arguments.