azure.eventhub package¶
- class azure.eventhub.EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)[source]¶
The EventHubConsumerClient class defines a high level interface for receiving events from the Azure Event Hubs service.
The main goal of EventHubConsumerClient is to receive events from all partitions of an EventHub with load-balancing and checkpointing.
When multiple EventHubConsumerClient instances are running against the same event hub, consumer group and checkpointing location, the partitions will be evenly distributed among them.
To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the EventHubConsumerClient. If a checkpoint store is not provided, the checkpoint will be maintained internally in memory.
An EventHubConsumerClient can also receive from a specific partition when you call its method receive() or receive_batch() and specify the partition_id. Load-balancing won’t work in single-partition mode. But users can still save checkpoints if the checkpoint_store is set.
- Parameters:
fully_qualified_namespace (str) – The fully qualified host name for the Event Hubs namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
eventhub_name (str) – The path of the specific Event Hub to connect the client to.
consumer_group (str) – Receive events from the event hub for this consumer group.
credential (TokenCredential or AzureSasCredential or AzureNamedKeyCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts
EventHubSharedKeyCredential
, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.- Keyword Arguments:
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – If specified, this will be added in front of the user agent string.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3. The context of retry_total in receiving is special: The receive method is implemented by a while-loop calling internal receive method in each iteration. In the receive case, retry_total specifies the numbers of retry after error raised by internal receive method in the while-loop. If retry attempts are exhausted, the on_error callback will be called (if provided) with the error information. The failed internal partition consumer will be closed (on_partition_close will be called if provided) and new internal partition consumer will be created (on_partition_initialize will be called if provided) to resume receiving.
retry_backoff_factor (float) – A backoff factor to apply between attempts after the second try (most errors are resolved immediately by a second try without a delay). In fixed mode, retry policy will always sleep for {backoff factor}. In ‘exponential’ mode, retry policy will sleep for: {backoff factor} * (2 ** ({number of total retries} - 1)) seconds. If the backoff_factor is 0.1, then the retry will sleep for [0.0s, 0.2s, 0.4s, …] between retries. The default value is 0.8.
retry_backoff_max (float) – The maximum back off time. Default value is 120 seconds (2 minutes).
retry_mode (str) – The delay behavior between retry attempts. Supported values are ‘fixed’ or ‘exponential’, where default is ‘exponential’.
idle_timeout (float) – Timeout, in seconds, after which this client will close the underlying connection if there is no further activity. By default the value is None, meaning that the client will not shutdown due to inactivity unless initiated by the service.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.
http_proxy (dict[str, str or int]) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
checkpoint_store (CheckpointStore or None) – A manager that stores the partition load-balancing and checkpoint data when receiving events. The checkpoint store will be used in both cases of receiving from all partitions or a single partition. In the latter case load-balancing does not apply. If a checkpoint store is not provided, the checkpoint will be maintained internally in memory, and the EventHubConsumerClient instance will receive events without load-balancing.
load_balancing_interval (float) – When load-balancing kicks in. This is the interval, in seconds, between two load-balancing evaluations. Default is 30 seconds.
partition_ownership_expiration_interval (float) – A partition ownership will expire after this number of seconds. Every load-balancing evaluation will automatically extend the ownership expiration time. Default is 6 * load_balancing_interval, i.e. 180 seconds when using the default load_balancing_interval of 30 seconds.
load_balancing_strategy (str or LoadBalancingStrategy) – When load-balancing kicks in, it will use this strategy to claim and balance the partition ownership. Use “greedy” or LoadBalancingStrategy.GREEDY for the greedy strategy, which, for every load-balancing evaluation, will grab as many unclaimed partitions required to balance the load. Use “balanced” or LoadBalancingStrategy.BALANCED for the balanced strategy, which, for every load-balancing evaluation, claims only one partition that is not claimed by other EventHubConsumerClient. If all partitions of an EventHub are claimed by other EventHubConsumerClient and this client has claimed too few partitions, this client will steal one partition from other clients for every load-balancing evaluation regardless of the load balancing strategy. Greedy strategy is used by default.
custom_endpoint_address (str or None) – The custom endpoint address to use for establishing a connection to the Event Hubs service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like “sb://<custom_endpoint_hostname>:<custom_endpoint_port>”. If port is not specified in the custom_endpoint_address, by default port 443 will be used.
connection_verify (str or None) – Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.
uamqp_transport (bool) – Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.
socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If EventHubsConnectionError errors are occurring due to write timing out, a larger than default value may need to be passed in. This is for advanced usage scenarios and ordinarily the default value should be sufficient.
Example:
import os from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME'] eventhub_name = os.environ['EVENT_HUB_NAME'] shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY'] shared_access_key = os.environ['EVENT_HUB_SAS_KEY'] credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key) consumer = EventHubConsumerClient( fully_qualified_namespace=fully_qualified_namespace, eventhub_name=eventhub_name, consumer_group='$Default', credential=credential)
- close() None [source]¶
Stop retrieving events from the Event Hub and close the underlying AMQP connection and links.
- Return type:
None
Example:
import os import threading from azure.identity import DefaultAzureCredential eventhub_fully_qualified_namespace = os.environ["EVENT_HUB_HOSTNAME"] eventhub_name = os.environ['EVENT_HUB_NAME'] from azure.eventhub import EventHubConsumerClient consumer = EventHubConsumerClient( fully_qualified_namespace=eventhub_fully_qualified_namespace, consumer_group="$Default", eventhub_name=eventhub_name, # EventHub name should be specified if it doesn't show up in connection string. credential=DefaultAzureCredential() ) logger = logging.getLogger("azure.eventhub") def on_event(partition_context, event): # Put your code here. # If the operation is i/o intensive, multi-thread will have better performance. logger.info("Received event from partition: {}".format(partition_context.partition_id)) # The 'receive' method is a blocking call, it can be executed in a thread for # non-blocking behavior, and combined with the 'close' method. worker = threading.Thread( target=consumer.receive, kwargs={ "on_event": on_event, "starting_position": "-1", # "-1" is from the beginning of the partition. } ) worker.start() time.sleep(10) # Keep receiving for 10s then close. # Close down the consumer handler explicitly. consumer.close()
- classmethod from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: Literal['exponential', 'fixed'] = 'exponential', idle_timeout: float | None = None, transport_type: TransportType = TransportType.Amqp, checkpoint_store: CheckpointStore | None = None, load_balancing_interval: float = 30, partition_ownership_expiration_interval: float | None = None, load_balancing_strategy: str | LoadBalancingStrategy = LoadBalancingStrategy.GREEDY, custom_endpoint_address: str | None = None, connection_verify: str | None = None, uamqp_transport: bool = False, **kwargs: Any) EventHubConsumerClient [source]¶
Create an EventHubConsumerClient from a connection string.
- Parameters:
- Keyword Arguments:
eventhub_name (str) – The path of the specific Event Hub to connect the client to.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – If specified, this will be added in front of the user agent string.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3. The context of retry_total in receiving is special: The receive method is implemented by a while-loop calling internal receive method in each iteration. In the receive case, retry_total specifies the numbers of retry after error raised by internal receive method in the while-loop. If retry attempts are exhausted, the on_error callback will be called (if provided) with the error information. The failed internal partition consumer will be closed (on_partition_close will be called if provided) and new internal partition consumer will be created (on_partition_initialize will be called if provided) to resume receiving.
retry_backoff_factor (float) – A backoff factor to apply between attempts after the second try (most errors are resolved immediately by a second try without a delay). In fixed mode, retry policy will always sleep for {backoff factor}. In ‘exponential’ mode, retry policy will sleep for: {backoff factor} * (2 ** ({number of total retries} - 1)) seconds. If the backoff_factor is 0.1, then the retry will sleep for [0.0s, 0.2s, 0.4s, …] between retries. The default value is 0.8.
retry_backoff_max (float) – The maximum back off time. Default value is 120 seconds (2 minutes).
retry_mode (str) – The delay behavior between retry attempts. Supported values are ‘fixed’ or ‘exponential’, where default is ‘exponential’.
idle_timeout (float) – Timeout, in seconds, after which this client will close the underlying connection if there is no furthur activity. By default the value is None, meaning that the client will not shutdown due to inactivity unless initiated by the service.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.
http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
checkpoint_store (CheckpointStore or None) – A manager that stores the partition load-balancing and checkpoint data when receiving events. The checkpoint store will be used in both cases of receiving from all partitions or a single partition. In the latter case load-balancing does not apply. If a checkpoint store is not provided, the checkpoint will be maintained internally in memory, and the EventHubConsumerClient instance will receive events without load-balancing.
load_balancing_interval (float) – When load-balancing kicks in. This is the interval, in seconds, between two load-balancing evaluations. Default is 10 seconds.
partition_ownership_expiration_interval (float) – A partition ownership will expire after this number of seconds. Every load-balancing evaluation will automatically extend the ownership expiration time. Default is 6 * load_balancing_interval, i.e. 60 seconds when using the default load_balancing_interval of 30 seconds.
load_balancing_strategy (str or LoadBalancingStrategy) – When load-balancing kicks in, it will use this strategy to claim and balance the partition ownership. Use “greedy” or LoadBalancingStrategy.GREEDY for the greedy strategy, which, for every load-balancing evaluation, will grab as many unclaimed partitions required to balance the load. Use “balanced” or LoadBalancingStrategy.BALANCED for the balanced strategy, which, for every load-balancing evaluation, claims only one partition that is not claimed by other EventHubConsumerClient. If all partitions of an EventHub are claimed by other EventHubConsumerClient and this client has claimed too few partitions, this client will steal one partition from other clients for every load-balancing evaluation regardless of the load balancing strategy. Greedy strategy is used by default.
custom_endpoint_address (str or None) – The custom endpoint address to use for establishing a connection to the Event Hubs service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like “sb://<custom_endpoint_hostname>:<custom_endpoint_port>”. If port is not specified in the custom_endpoint_address, by default port 443 will be used.
connection_verify (str or None) – Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.
uamqp_transport (bool) – Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.
- Returns:
An EventHubConsumerClient instance.
- Return type:
Example:
import os from azure.eventhub import EventHubConsumerClient from azure.identity import DefaultAzureCredential eventhub_fully_qualified_namespace = os.environ["EVENT_HUB_HOSTNAME"] eventhub_name = os.environ['EVENT_HUB_NAME'] consumer = EventHubConsumerClient( fully_qualified_namespace=eventhub_fully_qualified_namespace, consumer_group='$Default', eventhub_name=eventhub_name, # EventHub name should be specified if it doesn't show up in connection string. credential=DefaultAzureCredential() )
- get_eventhub_properties() Dict[str, Any] [source]¶
Get properties of the Event Hub.
Keys in the returned dictionary include:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
- Returns:
A dictionary containing information about the Event Hub.
- Return type:
- Raises:
- get_partition_ids() List[str] [source]¶
Get partition IDs of the Event Hub.
- Returns:
A list of partition IDs.
- Return type:
- Raises:
- get_partition_properties(partition_id: str) Dict[str, Any] [source]¶
Get properties of the specified partition.
Keys in the properties dictionary include:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
- Parameters:
partition_id (str) – The target partition ID.
- Returns:
A dictionary containing partition properties.
- Return type:
- Raises:
- receive(on_event: Callable[[PartitionContext, EventData | None], None], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[[PartitionContext, Exception], None] | None = None, on_partition_initialize: Callable[[PartitionContext], None] | None = None, on_partition_close: Callable[[PartitionContext, CloseReason], None] | None = None, **kwargs: Any) None [source]¶
Receive events from partition(s), with optional load-balancing and checkpointing.
- Parameters:
on_event (callable[PartitionContext, EventData or None]) – The callback function for handling a received event. The callback takes two parameters: partition_context which contains partition context and event which is the received event. The callback function should be defined like: on_event(partition_context, event). For detailed partition context information, please refer to
PartitionContext
.- Keyword Arguments:
max_wait_time (float) – The maximum interval in seconds that the event processor will wait before calling the callback. If no events are received within this interval, the on_event callback will be called with None. If this value is set to None or 0 (the default), the callback will not be called until an event is received.
partition_id (str) – If specified, the client will receive from this partition only. Otherwise the client will receive from all partitions.
owner_level (int) – The priority for an exclusive consumer. An exclusive consumer will be created if owner_level is set. A consumer with a higher owner_level has higher exclusive priority. The owner level is also know as the ‘epoch value’ of the consumer.
prefetch (int) – The number of events to prefetch from the service for processing. Default is 300.
track_last_enqueued_event_properties (bool) – Indicates whether the consumer should request information on the last-enqueued event on its associated partition, and track that information as events are received. When information about the partitions last-enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.
starting_position (str, int, datetime.datetime or dict[str,any]) – Start receiving from this event position if there is no checkpoint data for a partition. Checkpoint data will be used if available. This can be a a dict with partition ID as the key and position as the value for individual partitions, or a single value for all partitions. The value type can be str, int or datetime.datetime. Also supported are the values “-1” for receiving from the beginning of the stream, and “@latest” for receiving only new events. Default value is “@latest”.
starting_position_inclusive (bool or dict[str,bool]) – Determine whether the given starting_position is inclusive(>=) or not (>). True for inclusive and False for exclusive. This can be a dict with partition ID as the key and bool as the value indicating whether the starting_position for a specific partition is inclusive or not. This can also be a single bool value for all starting_position. The default value is False.
on_error (callable[[PartitionContext, Exception]]) – The callback function that will be called when an error is raised during receiving after retry attempts are exhausted, or during the process of load-balancing. The callback takes two parameters: partition_context which contains partition information and error being the exception. partition_context could be None if the error is raised during the process of load-balance. The callback should be defined like: on_error(partition_context, error). The on_error callback will also be called if an unhandled exception is raised during the on_event callback.
on_partition_initialize (callable[[PartitionContext]]) – The callback function that will be called after a consumer for a certain partition finishes initialization. It would also be called when a new internal partition consumer is created to take over the receiving process for a failed and closed internal partition consumer. The callback takes a single parameter: partition_context which contains the partition information. The callback should be defined like: on_partition_initialize(partition_context).
on_partition_close (callable[[PartitionContext, CloseReason]]) – The callback function that will be called after a consumer for a certain partition is closed. It would be also called when error is raised during receiving after retry attempts are exhausted. The callback takes two parameters: partition_context which contains partition information and reason for the close. The callback should be defined like: on_partition_close(partition_context, reason). Please refer to
CloseReason
for the various closing reasons.- Return type:
None
Example:
logger = logging.getLogger("azure.eventhub") def on_event(partition_context, event): # Put your code here. # If the operation is i/o intensive, multi-thread will have better performance. logger.info("Received event from partition: {}".format(partition_context.partition_id)) with consumer: consumer.receive(on_event=on_event)
- receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[[PartitionContext, Exception], None] | None = None, on_partition_initialize: Callable[[PartitionContext], None] | None = None, on_partition_close: Callable[[PartitionContext, CloseReason], None] | None = None, **kwargs: Any) None [source]¶
Receive events from partition(s), with optional load-balancing and checkpointing.
- Parameters:
on_event_batch (callable[PartitionContext, list[EventData]]) – The callback function for handling a batch of received events. The callback takes two parameters: partition_context which contains partition context and event_batch, which is the received events. The callback function should be defined like: on_event_batch(partition_context, event_batch). event_batch could be an empty list if max_wait_time is not None nor 0 and no event is received after max_wait_time. For detailed partition context information, please refer to
PartitionContext
.- Keyword Arguments:
max_batch_size (int) – The maximum number of events in a batch passed to callback on_event_batch. If the actual received number of events is larger than max_batch_size, the received events are divided into batches and call the callback for each batch with up to max_batch_size events.
max_wait_time (float) – The maximum interval in seconds that the event processor will wait before calling the callback. If no events are received within this interval, the on_event_batch callback will be called with an empty list.
partition_id (str) – If specified, the client will receive from this partition only. Otherwise the client will receive from all partitions.
owner_level (int) – The priority for an exclusive consumer. An exclusive consumer will be created if owner_level is set. A consumer with a higher owner_level has higher exclusive priority. The owner level is also know as the ‘epoch value’ of the consumer.
prefetch (int) – The number of events to prefetch from the service for processing. Default is 300.
track_last_enqueued_event_properties (bool) – Indicates whether the consumer should request information on the last-enqueued event on its associated partition, and track that information as events are received. When information about the partitions last-enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.
starting_position (str, int, datetime.datetime or dict[str,any]) – Start receiving from this event position if there is no checkpoint data for a partition. Checkpoint data will be used if available. This can be a a dict with partition ID as the key and position as the value for individual partitions, or a single value for all partitions. The value type can be str, int or datetime.datetime. Also supported are the values “-1” for receiving from the beginning of the stream, and “@latest” for receiving only new events. Default value is “@latest”.
starting_position_inclusive (bool or dict[str,bool]) – Determine whether the given starting_position is inclusive(>=) or not (>). True for inclusive and False for exclusive. This can be a dict with partition ID as the key and bool as the value indicating whether the starting_position for a specific partition is inclusive or not. This can also be a single bool value for all starting_position. The default value is False.
on_error (callable[[PartitionContext, Exception]]) – The callback function that will be called when an error is raised during receiving after retry attempts are exhausted, or during the process of load-balancing. The callback takes two parameters: partition_context which contains partition information and error being the exception. partition_context could be None if the error is raised during the process of load-balance. The callback should be defined like: on_error(partition_context, error). The on_error callback will also be called if an unhandled exception is raised during the on_event callback.
on_partition_initialize (callable[[PartitionContext]]) – The callback function that will be called after a consumer for a certain partition finishes initialization. It would also be called when a new internal partition consumer is created to take over the receiving process for a failed and closed internal partition consumer. The callback takes a single parameter: partition_context which contains the partition information. The callback should be defined like: on_partition_initialize(partition_context).
on_partition_close (callable[[PartitionContext, CloseReason]]) – The callback function that will be called after a consumer for a certain partition is closed. It would be also called when error is raised during receiving after retry attempts are exhausted. The callback takes two parameters: partition_context which contains partition information and reason for the close. The callback should be defined like: on_partition_close(partition_context, reason). Please refer to
CloseReason
for the various closing reasons.- Return type:
None
Example:
logger = logging.getLogger("azure.eventhub") def on_event_batch(partition_context, event_batch): # Put your code here. # If the operation is i/o intensive, multi-thread will have better performance. logger.info("Received events from partition: {}".format(partition_context.partition_id)) with consumer: consumer.receive_batch(on_event_batch=on_event_batch)
- class azure.eventhub.EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: Literal[False] = False, **kwargs: Any)[source]¶
- class azure.eventhub.EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: Literal[True], buffer_concurrency: ThreadPoolExecutor | int | None = None, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], None], on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], None], max_buffer_length: int = 1500, max_wait_time: float = 1, **kwargs: Any)
The EventHubProducerClient class defines a high level interface for sending events to the Azure Event Hubs service.
- Parameters:
fully_qualified_namespace (str) – The fully qualified host name for the Event Hubs namespace. This is likely to be similar to <yournamespace>.servicebus.windows.net
eventhub_name (str) – The path of the specific Event Hub to connect the client to.
credential (TokenCredential or AzureSasCredential or AzureNamedKeyCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts
EventHubSharedKeyCredential
, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.- Keyword Arguments:
buffered_mode (bool) – If True, the producer client will collect events in a buffer, efficiently batch, then publish. Default is False.
buffer_concurrency (ThreadPoolExecutor or int or None) – The ThreadPoolExecutor to be used for publishing events or the number of workers for the ThreadPoolExecutor. Default is None and a ThreadPoolExecutor with the default number of workers will be created per https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
on_success (Optional[Callable[[SendEventTypes, Optional[str]], None]]) –
The callback to be called once a batch has been successfully published. The callback takes two parameters: - events: The list of events that have been successfully published - partition_id: The partition id that the events in the list have been published to.
The callback function should be defined like: on_success(events, partition_id). It is required when buffered_mode is True while optional if buffered_mode is False.
on_error (Optional[Callable[[SendEventTypes, Optional[str], Exception], None]]) –
The callback to be called once a batch has failed to be published. The callback function should be defined like: on_error(events, partition_id, error), where: - events: The list of events that failed to be published, - partition_id: The partition id that the events in the list have been tried to be published to and - error: The exception related to the sending failure.
If buffered_mode is False, on_error callback is optional and errors will be handled as follows: - If an on_error callback is passed during the producer client instantiation, then error information will be passed to the on_error callback, which will then be called. - If an on_error callback is not passed in during client instantiation, then the error will be raised by default.
If buffered_mode is True, on_error callback is required and errors will be handled as follows: - If events fail to enqueue within the given timeout, then an error will be directly raised. - If events fail to send after enqueuing successfully, the on_error callback will be called.
max_buffer_length (int) – Buffered mode only. The total number of events per partition that can be buffered before a flush will be triggered. The default value is 1500 in buffered mode.
max_wait_time (Optional[float]) – Buffered mode only. The amount of time to wait for a batch to be built with events in the buffer before publishing. The default value is 1 in buffered mode.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – If specified, this will be added in front of the user agent string.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
retry_backoff_factor (float) – A backoff factor to apply between attempts after the second try (most errors are resolved immediately by a second try without a delay). In fixed mode, retry policy will always sleep for {backoff factor}. In ‘exponential’ mode, retry policy will sleep for: {backoff factor} * (2 ** ({number of total retries} - 1)) seconds. If the backoff_factor is 0.1, then the retry will sleep for [0.0s, 0.2s, 0.4s, …] between retries. The default value is 0.8.
retry_backoff_max (float) – The maximum back off time. Default value is 120 seconds (2 minutes).
retry_mode (str) – The delay behavior between retry attempts. Supported values are ‘fixed’ or ‘exponential’, where default is ‘exponential’.
idle_timeout (float) – Timeout, in seconds, after which this client will close the underlying connection if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity unless initiated by the service.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.
http_proxy (Dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
custom_endpoint_address (Optional[str]) – The custom endpoint address to use for establishing a connection to the Event Hubs service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like “sb://<custom_endpoint_hostname>:<custom_endpoint_port>”. If port is not specified in the custom_endpoint_address, by default port 443 will be used.
connection_verify (Optional[str]) – Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.
uamqp_transport (bool) – Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.
socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If EventHubsConnectionError errors are occurring due to write timing out, a larger than default value may need to be passed in. This is for advanced usage scenarios and ordinarily the default value should be sufficient.
Example:
import os from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME'] eventhub_name = os.environ['EVENT_HUB_NAME'] shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY'] shared_access_key = os.environ['EVENT_HUB_SAS_KEY'] credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key) producer = EventHubProducerClient( fully_qualified_namespace=fully_qualified_namespace, eventhub_name=eventhub_name, # EventHub name should be specified if it doesn't show up in connection string. credential=credential )
- close(*, flush: bool = True, timeout: float | None = None, **kwargs: Any) None [source]¶
Close the Producer client underlying AMQP connection and links.
- Keyword Arguments:
- Return type:
None
- Raises:
EventHubError – If an error occurred when flushing the buffer if flush is set to True or closing the underlying AMQP connections in buffered mode.
Example:
import os from azure.eventhub import EventHubProducerClient, EventData from azure.identity import DefaultAzureCredential eventhub_fully_qualified_namespace = os.environ["EVENT_HUB_HOSTNAME"] eventhub_name = os.environ['EVENT_HUB_NAME'] producer = EventHubProducerClient( fully_qualified_namespace=eventhub_fully_qualified_namespace, eventhub_name=eventhub_name, # EventHub name should be specified if it doesn't show up in connection string. credential=DefaultAzureCredential() ) try: event_data_batch = producer.create_batch() while True: try: event_data_batch.add(EventData('Message inside EventBatchData')) except ValueError: # EventDataBatch object reaches max_size. # New EventDataBatch object can be created here to send more data break producer.send_batch(event_data_batch) finally: # Close down the producer handler. producer.close()
- create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None, **kwargs: Any) EventDataBatch [source]¶
Create an EventDataBatch object with the max size of all content being constrained by max_size_in_bytes.
The max_size_in_bytes should be no greater than the max allowed message size defined by the service.
- Keyword Arguments:
partition_id (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin.
partition_key (str) – With the given partition_key, event data will be sent to a particular partition of the Event Hub decided by the service. If both partition_id and partition_key are provided, the partition_id will take precedence. WARNING: Setting partition_key of non-string value on the events to be sent is discouraged as the partition_key will be ignored by the Event Hub service and events will be assigned to all partitions using round-robin. Furthermore, there are SDKs for consuming events which expect partition_key to only be string type, they might fail to parse the non-string value.
max_size_in_bytes (int) – The maximum size of bytes data that an EventDataBatch object can hold. By default, the value is determined by your Event Hubs tier.
- Returns:
An EventDataBatch object instance
Example:
event_data_batch = producer.create_batch() while True: try: event_data_batch.add(EventData('Message inside EventBatchData')) except ValueError: # The EventDataBatch object reaches its max_size. # You can send the full EventDataBatch object and create a new one here. break
- Return type:
- flush(*, timeout: float | None = None, **kwargs: Any) None [source]¶
Buffered mode only. Flush events in the buffer to be sent immediately if the client is working in buffered mode.
- Keyword Arguments:
timeout (Optional[float]) – Timeout to flush the buffered events, default is None which means no timeout.
- Return type:
None
- Raises:
EventDataSendError – If the producer fails to flush the buffer within the given timeout in buffered mode.
- classmethod from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: Literal[False] = False, **kwargs: Any) EventHubProducerClient [source]¶
- classmethod from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: Literal[True], buffer_concurrency: ThreadPoolExecutor | int | None = None, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], None], on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], None], max_buffer_length: int = 1500, max_wait_time: float = 1, **kwargs: Any) EventHubProducerClient
Create an EventHubProducerClient from a connection string.
- Parameters:
conn_str (str) – The connection string of an Event Hub.
- Keyword Arguments:
eventhub_name (str) – The path of the specific Event Hub to connect the client to.
buffered_mode (bool) – If True, the producer client will collect events in a buffer, efficiently batch, then publish. Default is False.
buffer_concurrency (ThreadPoolExecutor or int or None) – The ThreadPoolExecutor to be used for publishing events or the number of workers for the ThreadPoolExecutor. Default is None and a ThreadPoolExecutor with the default number of workers will be created per https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
on_success (Optional[Callable[[SendEventTypes, Optional[str]], None]]) –
The callback to be called once a batch has been successfully published. The callback takes two parameters: - events: The list of events that have been successfully published - partition_id: The partition id that the events in the list have been published to.
The callback function should be defined like: on_success(events, partition_id). Required when buffered_mode is True while optional if buffered_mode is False.
on_error (Optional[Callable[[SendEventTypes, Optional[str], Exception], None]]) –
The callback to be called once a batch has failed to be published. Required when in buffered_mode is True while optional if buffered_mode is False. The callback function should be defined like: on_error(events, partition_id, error), where: - events: The list of events that failed to be published, - partition_id: The partition id that the events in the list have been tried to be published to and - error: The exception related to the sending failure.
If buffered_mode is False, on_error callback is optional and errors will be handled as follows: - If an on_error callback is passed during the producer client instantiation, then error information will be passed to the on_error callback, which will then be called. - If an on_error callback is not passed in during client instantiation, then the error will be raised by default.
If buffered_mode is True, on_error callback is required and errors will be handled as follows: - If events fail to enqueue within the given timeout, then an error will be directly raised. - If events fail to send after enqueuing successfully, the on_error callback will be called.
max_buffer_length (int) – Buffered mode only. The total number of events per partition that can be buffered before a flush will be triggered. The default value is 1500 in buffered mode.
max_wait_time (Optional[float]) – Buffered mode only. The amount of time to wait for a batch to be built with events in the buffer before publishing. The default value is 1 in buffered mode.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
http_proxy (Dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
auth_timeout (float) – The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
user_agent (str) – If specified, this will be added in front of the user agent string.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
retry_backoff_factor (float) – A backoff factor to apply between attempts after the second try (most errors are resolved immediately by a second try without a delay). In fixed mode, retry policy will always sleep for {backoff factor}. In ‘exponential’ mode, retry policy will sleep for: {backoff factor} * (2 ** ({number of total retries} - 1)) seconds. If the backoff_factor is 0.1, then the retry will sleep for [0.0s, 0.2s, 0.4s, …] between retries. The default value is 0.8.
retry_backoff_max (float) – The maximum back off time. Default value is 120 seconds (2 minutes).
retry_mode (str) – The delay behavior between retry attempts. Supported values are ‘fixed’ or ‘exponential’, where default is ‘exponential’.
idle_timeout (float) – Timeout, in seconds, after which this client will close the underlying connection if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity unless initiated by the service.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.
http_proxy – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
custom_endpoint_address (Optional[str]) – The custom endpoint address to use for establishing a connection to the Event Hubs service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like “sb://<custom_endpoint_hostname>:<custom_endpoint_port>”. If port is not specified in the custom_endpoint_address, by default port 443 will be used.
connection_verify (Optional[str]) – Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.
uamqp_transport (bool) – Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.
- Returns:
An instance of the EventHubProducerClient.
- Return type:
Example:
import os from azure.identity import DefaultAzureCredential eventhub_fully_qualified_namespace = os.environ["EVENT_HUB_HOSTNAME"] eventhub_name = os.environ['EVENT_HUB_NAME'] producer = EventHubProducerClient( fully_qualified_namespace=eventhub_fully_qualified_namespace, eventhub_name=eventhub_name, # EventHub name should be specified if it doesn't show up in connection string. credential=DefaultAzureCredential() )
- get_buffered_event_count(partition_id: str) int | None [source]¶
- The number of events that are buffered and waiting to be published for a given partition.
Returns None in non-buffered mode. NOTE: The event buffer is processed in a background thread, therefore the number of events in the buffer reported by this API should be considered only an approximation and is only recommend for use in debugging. For a partition ID that has no events buffered, 0 will be returned regardless of whether that partition ID actually exists within the Event Hub.
- get_eventhub_properties() Dict[str, Any] [source]¶
Get properties of the Event Hub.
Keys in the returned dictionary include:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
- Returns:
A dictionary containing eventhub properties.
- Return type:
- Raises:
- get_partition_ids() List[str] [source]¶
Get partition IDs of the Event Hub.
- Returns:
A list of partition IDs.
- Return type:
- Raises:
- get_partition_properties(partition_id: str) Dict[str, Any] [source]¶
Get properties of the specified partition.
Keys in the properties dictionary include:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
- Parameters:
partition_id (str) – The target partition ID.
- Returns:
A dictionary of partition properties.
- Return type:
- Raises:
- send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], *, timeout: float | None = None, partition_key: str | None = None, partition_id: str | None = None, **kwargs: Any) None [source]¶
Sends a batch of event data. By default, the method will block until acknowledgement is received or operation times out. If the EventHubProducerClient is configured to run in buffered mode, the method will try enqueuing the events into buffer within the given time if specified and return. The producer will do automatic sending in the background in buffered mode.
- If buffered_mode is False, on_error callback is optional and errors will be handled as follows:
- If an on_error callback is passed during the producer client instantiation,
then error information will be passed to the on_error callback, which will then be called.
- If an on_error callback is not passed in during client instantiation,
then the error will be raised by default.
- If buffered_mode is True, on_error callback is required and errors will be handled as follows:
If events fail to enqueue within the given timeout, then an error will be directly raised.
If events fail to send after enqueuing successfully, the on_error callback will be called.
In buffered mode, sending a batch will remain intact and sent as a single unit. The batch will not be rearranged. This may result in inefficiency of sending events.
If you’re sending a finite list of EventData or AmqpAnnotatedMessage and you know it’s within the event hub frame size limit, you can send them with a send_batch call. Otherwise, use
create_batch()
to create EventDataBatch and add either EventData or AmqpAnnotatedMessage into the batch one by one until the size limit, and then call this method to send out the batch.
- Parameters:
event_data_batch (Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]) – The EventDataBatch object to be sent or a list of EventData to be sent in a batch. All EventData or AmqpAnnotatedMessage in the list or EventDataBatch will land on the same partition.
- Keyword Arguments:
timeout (float) – The maximum wait time to send the event data in non-buffered mode or the maximum wait time to enqueue the event data into the buffer in buffered mode. In non-buffered mode, the default wait time specified when the producer was created will be used. In buffered mode, the default wait time is None.
partition_id (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin. A TypeError will be raised if partition_id is specified and event_data_batch is an EventDataBatch because EventDataBatch itself has partition_id.
partition_key (str) – With the given partition_key, event data will be sent to a particular partition of the Event Hub decided by the service. A TypeError will be raised if partition_key is specified and event_data_batch is an EventDataBatch because EventDataBatch itself has partition_key. If both partition_id and partition_key are provided, the partition_id will take precedence. WARNING: Setting partition_key of non-string value on the events to be sent is discouraged as the partition_key will be ignored by the Event Hub service and events will be assigned to all partitions using round-robin. Furthermore, there are SDKs for consuming events which expect partition_key to only be string type, they might fail to parse the non-string value.
- Return type:
None
- Raises:
AuthenticationError
ConnectError
ConnectionLostError
EventDataError
EventDataSendError
EventHubError
ValueError
TypeError
- Raises:
OperationTimeoutError – If the value specified by the timeout parameter elapses before the event can be sent in non-buffered mode or the events can not be enqueued into the buffered in buffered mode.
Example:
with producer: event_data_batch = producer.create_batch() while True: try: event_data_batch.add(EventData('Message inside EventBatchData')) except ValueError: # EventDataBatch object reaches max_size. # New EventDataBatch object can be created here to send more data break producer.send_batch(event_data_batch)
- send_event(event_data: EventData | AmqpAnnotatedMessage, *, timeout: float | None = None, partition_id: str | None = None, partition_key: str | None = None, **kwargs: Any) None [source]¶
Sends an event data. By default, the method will block until acknowledgement is received or operation times out. If the EventHubProducerClient is configured to run in buffered mode, the method will try enqueuing the events into buffer within the given time if specified and return. The producer will do automatic sending in the background in buffered mode.
- If buffered_mode is False, on_error callback is optional and errors will be handled as follows:
- If an on_error callback is passed during the producer client instantiation,
then error information will be passed to the on_error callback, which will then be called.
- If an on_error callback is not passed in during client instantiation,
then the error will be raised by default.
- If buffered_mode is True, on_error callback is required and errors will be handled as follows:
If events fail to enqueue within the given timeout, then an error will be directly raised.
If events fail to send after enqueuing successfully, the on_error callback will be called.
- Parameters:
event_data (Union[EventData, AmqpAnnotatedMessage]) – The EventData object to be sent.
- Keyword Arguments:
timeout (float) – The maximum wait time to send the event data in non-buffered mode or the maximum wait time to enqueue the event data into the buffer in buffered mode. In non-buffered mode, the default wait time specified when the producer was created will be used. In buffered mode, the default wait time is None indicating that the event will be scheduled to send immediately.
partition_id (str) – The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin. A TypeError will be raised if partition_id is specified and event_data_batch is an EventDataBatch because EventDataBatch itself has partition_id.
partition_key (str) – With the given partition_key, event data will be sent to a particular partition of the Event Hub decided by the service. A TypeError will be raised if partition_key is specified and event_data_batch is an EventDataBatch because EventDataBatch itself has partition_key. If both partition_id and partition_key are provided, the partition_id will take precedence. WARNING: Setting partition_key of non-string value on the events to be sent is discouraged as the partition_key will be ignored by the Event Hub service and events will be assigned to all partitions using round-robin. Furthermore, there are SDKs for consuming events which expect partition_key to only be string type, they might fail to parse the non-string value.
- Return type:
None
- Raises:
AuthenticationError
ConnectError
ConnectionLostError
EventDataError
EventDataSendError
EventHubError
- Raises:
OperationTimeoutError – If the value specified by the timeout parameter elapses before the event can be sent in non-buffered mode or the events can be enqueued into the buffered in buffered mode.
- property total_buffered_event_count: int | None¶
- The total number of events that are currently buffered and waiting to be published,
across all partitions. Returns None in non-buffered mode. NOTE: The event buffer is processed in a background thread, therefore the number of events in the buffer reported by this API should be considered only an approximation and is only recommend for use in debugging.
- Return type:
int or None
- class azure.eventhub.EventData(body: str | bytes | List | None = None)[source]¶
The EventData class is a container for event content.
- Parameters:
body (str or bytes) – The data to send in a single message. body can be type of str or bytes.
Example:
from azure.eventhub import EventData event_data = EventData("String data") event_data = EventData(b"Bytes data")
- body_as_json(encoding: str = 'UTF-8') Dict[str, Any] [source]¶
The content of the event loaded as a JSON object, if the data is compatible.
- body_as_str(encoding: str = 'UTF-8') str [source]¶
The content of the event as a string, if the data is of a compatible type.
- classmethod from_message_content(content: bytes, content_type: str, **kwargs: Any) EventData [source]¶
Creates an EventData object given content type and a content value to be set as body.
- property body: int | float | bytes | bool | str | Dict[str, Any] | List[Any] | UUID | None¶
The body of the Message. The format may vary depending on the body type: For
azure.eventhub.amqp.AmqpMessageBodyType.DATA
, the body could be bytes or Iterable[bytes]. Forazure.eventhub.amqp.AmqpMessageBodyType.SEQUENCE
, the body could be List or Iterable[List]. Forazure.eventhub.amqp.AmqpMessageBodyType.VALUE
, the body could be any type.
- property body_type: AmqpMessageBodyType¶
The body type of the underlying AMQP message.
- Return type:
- property content_type: str | None¶
The content type descriptor. Optionally describes the payload of the message, with a descriptor following the format of RFC2045, Section 5, for example “application/json”. :rtype: str or None
- property correlation_id: str | None¶
The correlation identifier. Allows an application to specify a context for the message for the purposes of correlation, for example reflecting the MessageId of a message that is being replied to. :rtype: str or None
- property enqueued_time: datetime | None¶
The enqueued timestamp of the event.
- Return type:
datetime.datetime or None
- property message: 'Message' | LegacyMessage¶
- Get the underlying uamqp.Message or LegacyMessage.
This is deprecated and will be removed in a later release.
- Return type:
uamqp.Message or LegacyMessage
- Type:
DEPRECATED
- property message_id: str | None¶
The id to identify the message. The message identifier is an application-defined value that uniquely identifies the message and its payload. The identifier is a free-form string and can reflect a GUID or an identifier derived from the application context. If enabled, the duplicate detection feature identifies and removes second and further submissions of messages with the same message id. :rtype: str or None
- property raw_amqp_message: AmqpAnnotatedMessage¶
Advanced usage only. The internal AMQP message payload that is sent or received.
- Return type:
- property system_properties: Dict[bytes, Any]¶
Metadata set by the Event Hubs Service associated with the event.
An EventData could have some or all of the following meta data depending on the source of the event data.
b”x-opt-sequence-number” (int)
b”x-opt-offset” (bytes)
b”x-opt-partition-key” (bytes)
b”x-opt-enqueued-time” (int)
b”message-id” (bytes)
b”user-id” (bytes)
b”to” (bytes)
b”subject” (bytes)
b”reply-to” (bytes)
b”correlation-id” (bytes)
b”content-type” (bytes)
b”content-encoding” (bytes)
b”absolute-expiry-time” (int)
b”creation-time” (int)
b”group-id” (bytes)
b”group-sequence” (bytes)
b”reply-to-group-id” (bytes)
- class azure.eventhub.EventDataBatch(max_size_in_bytes: int | None = None, partition_id: str | None = None, partition_key: str | bytes | None = None, **kwargs: Any)[source]¶
A batch of events.
Sending events in a batch is more performant than sending individual events. EventDataBatch helps you create the maximum allowed size batch of EventData to improve sending performance.
Use the add method to add events until the maximum batch size limit in bytes has been reached - at which point a ValueError will be raised. Use the send_batch method of
EventHubProducerClient
or the asyncEventHubProducerClient
for sending.Please use the create_batch method of EventHubProducerClient to create an EventDataBatch object instead of instantiating an EventDataBatch object directly.
WARNING: Updating the value of the instance variable max_size_in_bytes on an instantiated EventDataBatch object is HIGHLY DISCOURAGED. The updated max_size_in_bytes value may conflict with the maximum size of events allowed by the Event Hubs service and result in a sending failure.
- Parameters:
- add(event_data: EventData | AmqpAnnotatedMessage) None [source]¶
Try to add an EventData to the batch.
The total size of an added event is the sum of its body, properties, etc. If this added size results in the batch exceeding the maximum batch size, a ValueError will be raised.
- Parameters:
event_data (Union[EventData, AmqpAnnotatedMessage]) – The EventData to add to the batch.
- Return type:
None
- Raise:
ValueError
, when exceeding the size limit.
- property message: 'BatchMessage' | LegacyBatchMessage¶
- Get the underlying uamqp.BatchMessage or LegacyBatchMessage.
This is deprecated and will be removed in a later release.
- Return type:
uamqp.BatchMessage or LegacyBatchMessage
- Type:
DEPRECATED
The shared access key credential used for authentication.
- class azure.eventhub.CheckpointStore[source]¶
CheckpointStore deals with the interaction with the chosen storage service.
It can list and claim partition ownerships as well as list and save checkpoints.
- abstract claim_ownership(ownership_list: Iterable[Dict[str, Any]], **kwargs: Any) Iterable[Dict[str, Any]] [source]¶
Tries to claim ownership for a list of specified partitions.
- Parameters:
ownership_list (Iterable[Dict[str,Any]]) – Iterable of dictionaries containing all the ownerships to claim.
- Return type:
Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information:
fully_qualified_namespace (str): The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”.
eventhub_name (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.
consumer_group (str): The name of the consumer group the ownership are associated with.
partition_id (str): The partition ID which the checkpoint is created for.
owner_id (str): A UUID representing the owner attempting to claim this partition.
last_modified_time (UTC datetime.datetime): The last time this ownership was claimed.
etag (str): The Etag value for the last time this ownership was modified. Optional depending on storage implementation.
- abstract list_checkpoints(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, **kwargs: Any) Iterable[Dict[str, Any]] [source]¶
List the updated checkpoints from the chosen storage service.
- Parameters:
fully_qualified_namespace (str) – The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”.
eventhub_name (str) – The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.
consumer_group (str) – The name of the consumer group the checkpoints are associated with.
- Return type:
Iterable[Dict[str,Any]], Iterable of dictionaries containing partition checkpoint information:
fully_qualified_namespace (str): The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”.
eventhub_name (str): The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.
consumer_group (str): The name of the consumer group the checkpoints are associated with.
partition_id (str): The partition ID which the checkpoint is created for.
sequence_number (int): The sequence number of the
EventData
.offset (str): The offset of the
EventData
.
- abstract list_ownership(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, **kwargs: Any) Iterable[Dict[str, Any]] [source]¶
Retrieves a complete ownership list from the chosen storage service.
- Parameters:
fully_qualified_namespace (str) – The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”.
eventhub_name (str) – The name of the specific Event Hub the partition ownerships are associated with, relative to the Event Hubs namespace that contains it.
consumer_group (str) – The name of the consumer group the ownerships are associated with.
- Return type:
Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information:
fully_qualified_namespace (str): The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”.
eventhub_name (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.
consumer_group (str): The name of the consumer group the ownership are associated with.
partition_id (str): The partition ID which the checkpoint is created for.
owner_id (str): A UUID representing the current owner of this partition.
last_modified_time (UTC datetime.datetime): The last time this ownership was claimed.
etag (str): The Etag value for the last time this ownership was modified. Optional depending on storage implementation.
- abstract update_checkpoint(checkpoint: Dict[str, str | int | None], **kwargs: Any) None [source]¶
Updates the checkpoint using the given information for the offset, associated partition and consumer group in the chosen storage service.
Note: If you plan to implement a custom checkpoint store with the intention of running between cross-language EventHubs SDKs, it is recommended to persist the offset value as an integer.
- Parameters:
checkpoint (Dict[str,Any]) –
A dict containing checkpoint information:
fully_qualified_namespace (str): The fully qualified namespace that the Event Hub belongs to. The format is like “<namespace>.servicebus.windows.net”.
eventhub_name (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.
consumer_group (str): The name of the consumer group the checkpoint is associated with.
partition_id (str): The partition ID which the checkpoint is created for.
sequence_number (int): The sequence number of the
EventData
the new checkpoint will be associated with.offset (str): The offset of the
EventData
the new checkpoint will be associated with.- Return type:
None
- class azure.eventhub.PartitionContext(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, partition_id: str, checkpoint_store: CheckpointStore | None = None)[source]¶
Contains partition related context information.
A PartitionContext instance will be passed to the event, error and initialization callbacks defined when calling EventHubConsumerClient.receive(). Users can call update_checkpoint() of this class to persist checkpoint data.
- update_checkpoint(event: EventData | None = None, **kwargs: Any) None [source]¶
Updates the receive checkpoint to the given events offset.
- Parameters:
event (EventData) – The EventData instance which contains the offset and sequence number information used for checkpoint.
- Return type:
None
- property last_enqueued_event_properties: Dict[str, Any] | None¶
The latest enqueued event information.
This property will be updated each time an event is received if the receiver is created with track_last_enqueued_event_properties set to True. The properties dict includes following information of the last enqueued event:
sequence_number (int)
offset (str)
enqueued_time (UTC datetime.datetime)
retrieval_time (UTC datetime.datetime)
- Return type:
Dict[str, Any] or None
- class azure.eventhub.CloseReason(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
The reason a partition consumer is closed.
- OWNERSHIP_LOST = 1¶
- SHUTDOWN = 0¶
- class azure.eventhub.TransportType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Transport type The underlying transport protocol type: Amqp: AMQP over the default TCP transport protocol, it uses port 5671. AmqpOverWebsocket: Amqp over the Web Sockets transport protocol, it uses port 443.
- Amqp = 1¶
- AmqpOverWebsocket = 2¶
Subpackages¶
- azure.eventhub.aio package
EventHubConsumerClient
EventHubProducerClient
EventHubProducerClient.close()
EventHubProducerClient.create_batch()
EventHubProducerClient.flush()
EventHubProducerClient.from_connection_string()
EventHubProducerClient.get_buffered_event_count()
EventHubProducerClient.get_eventhub_properties()
EventHubProducerClient.get_partition_ids()
EventHubProducerClient.get_partition_properties()
EventHubProducerClient.send_batch()
EventHubProducerClient.send_event()
EventHubProducerClient.total_buffered_event_count
EventHubSharedKeyCredential
CheckpointStore
PartitionContext
- azure.eventhub.exceptions package
- azure.eventhub.amqp package
AmqpAnnotatedMessage
AmqpMessageHeader
AmqpMessageProperties
AmqpMessageBodyType
AmqpMessageBodyType.capitalize()
AmqpMessageBodyType.casefold()
AmqpMessageBodyType.center()
AmqpMessageBodyType.count()
AmqpMessageBodyType.encode()
AmqpMessageBodyType.endswith()
AmqpMessageBodyType.expandtabs()
AmqpMessageBodyType.find()
AmqpMessageBodyType.format()
AmqpMessageBodyType.format_map()
AmqpMessageBodyType.index()
AmqpMessageBodyType.isalnum()
AmqpMessageBodyType.isalpha()
AmqpMessageBodyType.isascii()
AmqpMessageBodyType.isdecimal()
AmqpMessageBodyType.isdigit()
AmqpMessageBodyType.isidentifier()
AmqpMessageBodyType.islower()
AmqpMessageBodyType.isnumeric()
AmqpMessageBodyType.isprintable()
AmqpMessageBodyType.isspace()
AmqpMessageBodyType.istitle()
AmqpMessageBodyType.isupper()
AmqpMessageBodyType.join()
AmqpMessageBodyType.ljust()
AmqpMessageBodyType.lower()
AmqpMessageBodyType.lstrip()
AmqpMessageBodyType.maketrans()
AmqpMessageBodyType.partition()
AmqpMessageBodyType.removeprefix()
AmqpMessageBodyType.removesuffix()
AmqpMessageBodyType.replace()
AmqpMessageBodyType.rfind()
AmqpMessageBodyType.rindex()
AmqpMessageBodyType.rjust()
AmqpMessageBodyType.rpartition()
AmqpMessageBodyType.rsplit()
AmqpMessageBodyType.rstrip()
AmqpMessageBodyType.split()
AmqpMessageBodyType.splitlines()
AmqpMessageBodyType.startswith()
AmqpMessageBodyType.strip()
AmqpMessageBodyType.swapcase()
AmqpMessageBodyType.title()
AmqpMessageBodyType.translate()
AmqpMessageBodyType.upper()
AmqpMessageBodyType.zfill()
AmqpMessageBodyType.DATA
AmqpMessageBodyType.SEQUENCE
AmqpMessageBodyType.VALUE