azure.servicebus.aio package¶
-
class
azure.servicebus.aio.
AutoLockRenew
(loop=None)[source]¶ Auto lock renew.
An asynchronous AutoLockRenew handler for renewing the lock tokens of messages and/or sessions in the background.
- Parameters
loop (EventLoop) – An async event loop.
Example:
from azure.servicebus.aio import AutoLockRenew lock_renewal = AutoLockRenew() async with servicebus_receiver: async for message in servicebus_receiver: lock_renewal.register(message, timeout=60) await process_message(message) await message.complete()
from azure.servicebus.aio import AutoLockRenew lock_renewal = AutoLockRenew() async with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session # Auto renew session lock for 2 minutes lock_renewal.register(session, timeout=120) async for message in receiver: await process_message(message) await message.complete()
-
register
(renewable, timeout=300)[source]¶ Register a renewable entity for automatic lock renewal.
- Parameters
renewable (ReceivedMessage or Session) – A locked entity that needs to be renewed.
timeout (float) – A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes).
-
class
azure.servicebus.aio.
ReceivedMessage
(message, mode=<ReceiveSettleMode.PeekLock: <ReceiverSettleMode.PeekLock: 1>>, **kwargs)[source]¶ A Service Bus Message received from service side.
-
async
abandon
() → None[source]¶ Abandon the message.
This message will be returned to the queue and made available to be received again.
- Return type
- Raises
~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
- Raises
~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
- Raises
~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
-
async
complete
() → None[source]¶ Complete the message.
This removes the message from the queue.
- Return type
- Raises
~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
- Raises
~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
- Raises
~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
- Raises
~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
-
async
dead_letter
(reason: Optional[str] = None, description: Optional[str] = None) → None[source]¶ Move the message to the Dead Letter queue.
The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue.
- Parameters
- Return type
- Raises
~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
- Raises
~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
- Raises
~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
-
async
defer
() → None[source]¶ Defers the message.
This message will remain in the queue but must be requested specifically by its sequence number in order to be received.
- Return type
- Raises
~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
- Raises
~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
- Raises
~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
-
async
renew_lock
() → None[source]¶ Renew the message lock.
This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed. In order to complete (or otherwise settle) the message, the lock must be maintained. Messages received via ReceiveAndDelete mode are not locked, and therefore cannot be renewed. This operation can also be performed as an asynchronous background task by registering the message with an azure.servicebus.aio.AutoLockRenew instance. This operation is only available for non-sessionful messages.
- Return type
- Raises
TypeError if the message is sessionful.
- Raises
~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired.
- Raises
~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
- Raises
~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
-
property
enqueue_sequence_number
¶ int
- Type
rtype
-
property
enqueued_time_utc
¶ ~datetime.datetime
- Type
rtype
-
property
expired
¶ bool
- Type
rtype
-
property
lock_token
¶ ~uuid.UUID or str
- Type
rtype
-
property
locked_until_utc
¶ datetime.datetime
- Type
rtype
-
property
partition_id
¶ int
- Type
rtype
-
property
partition_key
¶ str
- Type
rtype
-
property
scheduled_enqueue_time_utc
¶ Get or set the utc scheduled enqueue time to the message. This property can be used for scheduling when sending a message through ServiceBusSender.send method. If cancelling scheduled messages is required, you should use the ServiceBusSender.schedule method, which returns sequence numbers that can be used for future cancellation. scheduled_enqueue_time_utc is None if not set.
- Return type
-
property
sequence_number
¶ int
- Type
rtype
-
property
settled
¶ Whether the message has been settled.
This will aways be True for a message received using ReceiveAndDelete mode, otherwise it will be False until the message is completed or otherwise settled.
- Return type
-
property
time_to_live
¶ ~datetime.timedelta
- Type
rtype
-
property
via_partition_key
¶ str
- Type
rtype
-
async
-
class
azure.servicebus.aio.
ServiceBusClient
(fully_qualified_namespace: str, credential: TokenCredential, **kwargs: Any)[source]¶ The ServiceBusClient class defines a high level interface for getting ServiceBusSender and ServiceBusReceiver.
- Variables
fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
- Parameters
fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
credential (TokenCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts
ServiceBusSharedKeyCredential
, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.
- Keyword Arguments
entity_name (str) – Optional entity name, this can be the name of Queue or Topic. It must be specified if the credential is for specific Queue or Topic.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.
http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
Example:
import os from azure.servicebus.aio import ServiceBusClient, ServiceBusSharedKeyCredential fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] servicebus_client = ServiceBusClient( fully_qualified_namespace=fully_qualified_namespace, credential=ServiceBusSharedKeyCredential( shared_access_policy, shared_access_key ) )
-
classmethod
from_connection_string
(conn_str: str, **kwargs: Any) → azure.servicebus.aio._servicebus_client_async.ServiceBusClient[source]¶ Create a ServiceBusClient from a connection string.
- Parameters
conn_str – The connection string of a Service Bus.
- Keyword Arguments
entity_name (str) – Optional entity name, this can be the name of Queue or Topic. It must be specified if the credential is for specific Queue or Topic.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.
http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
- Return type
Example:
import os from azure.servicebus.aio import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
-
get_queue_deadletter_receiver
(queue_name: str, **kwargs: Any) → azure.servicebus.aio._servicebus_receiver_async.ServiceBusReceiver[source]¶ - Get ServiceBusReceiver for the dead-letter queue which is the secondary subqueue provided by
the specific Queue, it holds messages that can’t be delivered to any receiver or messages that can’t be processed.
- Parameters
queue_name (str) – The path of specific Service Bus Queue the client connects to.
- Keyword Arguments
mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.
prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0, meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.
idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.
retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.
transfer_deadletter (bool) – Whether to connect to the transfer dead-letter queue, or the standard dead-letter queue. The transfer dead letter queue holds messages that have failed to be transferred in ForwardTo or SendVia scenarios. Default is False, using the standard dead-letter endpoint.
- Return type
Example:
-
get_queue_receiver
(queue_name: str, **kwargs: Any) → azure.servicebus.aio._servicebus_receiver_async.ServiceBusReceiver[source]¶ Get ServiceBusReceiver for the specific queue.
- Parameters
queue_name (str) – The path of specific Service Bus Queue the client connects to.
- Keyword Arguments
mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.
prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0, meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.
idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
- Return type
Example:
import os from azure.servicebus.aio import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) async with servicebus_client: queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)
-
get_queue_sender
(queue_name: str, **kwargs: Any) → azure.servicebus.aio._servicebus_sender_async.ServiceBusSender[source]¶ Get ServiceBusSender for the specific queue.
- Parameters
queue_name (str) – The path of specific Service Bus Queue the client connects to.
- Keyword Arguments
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
- Return type
Example:
import os from azure.servicebus.aio import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) async with servicebus_client: queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
-
get_queue_session_receiver
(queue_name: str, session_id: Optional[str] = None, **kwargs: Any) → azure.servicebus.aio._servicebus_session_receiver_async.ServiceBusSessionReceiver[source]¶ Get ServiceBusSessionReceiver for the specific queue.
- Parameters
queue_name (str) – The path of specific Service Bus Queue the client connects to.
session_id (str) – A specific session from which to receive. This must be specified for a sessionful entity, otherwise it must be None. In order to receive messages from the next available session, set this to None. The default is None.
- Keyword Arguments
mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.
prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0, meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.
idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
- Return type
Example:
import os from azure.servicebus.aio import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) async with servicebus_client: queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
-
get_subscription_deadletter_receiver
(topic_name: str, subscription_name: str, **kwargs: Any) → azure.servicebus.aio._servicebus_receiver_async.ServiceBusReceiver[source]¶ - Get ServiceBusReceiver for the dead-letter queue which is the secondary subqueue provided by
the specific topic subscription, it holds messages that can’t be delivered to any receiver or messages that can’t be processed.
- Parameters
- Keyword Arguments
mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.
prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0, meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.
idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.
retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.
transfer_deadletter (bool) – Whether to connect to the transfer dead-letter queue, or the standard dead-letter queue. The transfer dead letter queue holds messages that have failed to be transferred in ForwardTo or SendVia scenarios. Default is False, using the standard dead-letter endpoint.
- Return type
Example:
import os from azure.servicebus import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] topic_name = os.environ["SERVICE_BUS_TOPIC_NAME"] subscription_name = os.environ["SERVICE_BUS_SUBSCRIPTION_NAME"] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) async with servicebus_client: subscription_receiver = servicebus_client.get_subscription_deadletter_receiver( topic_name=topic_name, subscription_name=subscription_name, )
-
get_subscription_receiver
(topic_name: str, subscription_name: str, **kwargs: Any) → azure.servicebus.aio._servicebus_receiver_async.ServiceBusReceiver[source]¶ Get ServiceBusReceiver for the specific subscription under the topic.
- Parameters
- Keyword Arguments
mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.
prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0, meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.
idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.
retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.
- Return type
Example:
import os from azure.servicebus import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] topic_name = os.environ["SERVICE_BUS_TOPIC_NAME"] subscription_name = os.environ["SERVICE_BUS_SUBSCRIPTION_NAME"] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) async with servicebus_client: subscription_receiver = servicebus_client.get_subscription_receiver( topic_name=topic_name, subscription_name=subscription_name, )
-
get_subscription_session_receiver
(topic_name: str, subscription_name: str, session_id: Optional[str] = None, **kwargs: Any) → azure.servicebus.aio._servicebus_receiver_async.ServiceBusReceiver[source]¶ Get ServiceBusReceiver for the specific subscription under the topic.
- Parameters
topic_name (str) – The name of specific Service Bus Topic the client connects to.
subscription_name (str) – The name of specific Service Bus Subscription under the given Service Bus Topic.
session_id (str) – A specific session from which to receive. This must be specified for a sessionful entity, otherwise it must be None. In order to receive messages from the next available session, set this to None. The default is None.
- Keyword Arguments
mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.
prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0, meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.
idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.
retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.
- Return type
Example:
import os from azure.servicebus import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] topic_name = os.environ["SERVICE_BUS_TOPIC_NAME"] subscription_name = os.environ["SERVICE_BUS_SUBSCRIPTION_NAME"] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) async with servicebus_client: subscription_receiver = servicebus_client.get_subscription_receiver( topic_name=topic_name, subscription_name=subscription_name, )
-
get_topic_sender
(topic_name: str, **kwargs: Any) → azure.servicebus.aio._servicebus_sender_async.ServiceBusSender[source]¶ Get ServiceBusSender for the specific topic.
- Parameters
topic_name (str) – The path of specific Service Bus Topic the client connects to.
- Keyword Arguments
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.
retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.
- Return type
Example:
import os from azure.servicebus import ServiceBusClient servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] topic_name = os.environ['SERVICE_BUS_TOPIC_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) async with servicebus_client: topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name)
-
class
azure.servicebus.aio.
ServiceBusReceiver
(fully_qualified_namespace: str, credential: TokenCredential, **kwargs: Any)[source]¶ The ServiceBusReceiver class defines a high level interface for receiving messages from the Azure Service Bus Queue or Topic Subscription.
- Variables
- Parameters
fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
credential (TokenCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts
ServiceBusSharedKeyCredential
, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.
- Keyword Arguments
queue_name (str) – The path of specific Service Bus Queue the client connects to.
topic_name (str) – The path of specific Service Bus Topic which contains the Subscription the client connects to.
subscription_name (str) – The path of specific Service Bus Subscription under the specified Topic the client connects to.
mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.
prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0 meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.
idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.
http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
Example:
import os from azure.servicebus.aio import ServiceBusReceiver, ServiceBusSharedKeyCredential fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] queue_receiver = ServiceBusReceiver( fully_qualified_namespace=fully_qualified_namespace, credential=ServiceBusSharedKeyCredential( shared_access_policy, shared_access_key ), queue_name=queue_name )
-
async
close
() → None¶ Close down the handler connection.
If the handler has already closed, this operation will do nothing. An optional exception can be passed in to indicate that the handler was shutdown due to error.
- Return type
-
classmethod
from_connection_string
(conn_str: str, **kwargs: Any) → azure.servicebus.aio._servicebus_receiver_async.ServiceBusReceiver[source]¶ Create a ServiceBusReceiver from a connection string.
- Parameters
conn_str – The connection string of a Service Bus.
- Keyword Arguments
queue_name (str) – The path of specific Service Bus Queue the client connects to.
topic_name (str) – The path of specific Service Bus Topic which contains the Subscription the client connects to.
subscription_name (str) – The path of specific Service Bus Subscription under the specified Topic the client connects to.
mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.
prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0, meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.
idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.
http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
- Return type
Example:
import os from azure.servicebus.aio import ServiceBusReceiver servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] queue_receiver = ServiceBusReceiver.from_connection_string( conn_str=servicebus_connection_str, queue_name=queue_name )
-
async
peek
(message_count=1, sequence_number=0)[source]¶ Browse messages currently pending in the queue.
Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered.
- Parameters
- Return type
Example:
async with servicebus_receiver: messages = await servicebus_receiver.peek() for message in messages: print(message)
-
async
receive
(max_batch_size: Optional[int] = None, max_wait_time: Optional[float] = None) → List[azure.servicebus.aio._async_message.ReceivedMessage][source]¶ Receive a batch of messages at once.
This approach it optimal if you wish to process multiple messages simultaneously. Note that the number of messages retrieved in a single batch will be dependent on whether prefetch was set for the receiver. This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size.
- Parameters
max_batch_size (int) – Maximum number of messages in the batch. Actual number returned will depend on prefetch size and incoming stream rate.
max_wait_time (float) – Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, an no messages arrive within the timeout period, an empty list will be returned.
- Return type
Example:
async with servicebus_receiver: messages = await servicebus_receiver.receive(max_wait_time=5) for message in messages: print(message) await message.complete()
-
async
receive_deferred_messages
(sequence_numbers: List[int]) → List[azure.servicebus.aio._async_message.ReceivedMessage][source]¶ Receive messages that have previously been deferred.
When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition.
- Parameters
sequence_numbers (list[int]) – A list of the sequence numbers of messages that have been deferred.
- Return type
Example:
async with servicebus_receiver: deferred_sequenced_numbers = [] messages = await servicebus_receiver.receive(max_wait_time=5) for message in messages: deferred_sequenced_numbers.append(message.sequence_number) print(message) await message.defer() received_deferred_msg = await servicebus_receiver.receive_deferred_messages( sequence_numbers=deferred_sequenced_numbers ) for msg in received_deferred_msg: await msg.complete()
-
class
azure.servicebus.aio.
ServiceBusSender
(fully_qualified_namespace: str, credential: TokenCredential, **kwargs: Any)[source]¶ The ServiceBusSender class defines a high level interface for sending messages to the Azure Service Bus Queue or Topic.
- Variables
- Parameters
fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
credential (TokenCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts
ServiceBusSharedKeyCredential
, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.
- Keyword Arguments
queue_name (str) – The path of specific Service Bus Queue the client connects to. Only one of queue_name or topic_name can be provided.
topic_name (str) – The path of specific Service Bus Topic the client connects to. Only one of queue_name or topic_name can be provided.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.
http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
Example:
import os from azure.servicebus.aio import ServiceBusSender, ServiceBusSharedKeyCredential fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] queue_sender = ServiceBusSender( fully_qualified_namespace=fully_qualified_namespace, credential=ServiceBusSharedKeyCredential( shared_access_policy, shared_access_key ), queue_name=queue_name )
-
async
cancel_scheduled_messages
(sequence_numbers: Union[int, List[int]]) → None[source]¶ Cancel one or more messages that have previously been scheduled and are still pending.
- Parameters
sequence_numbers (int or list[int]) – The sequence numbers of the scheduled messages.
- Return type
- Raises
~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already cancelled or enqueued.
Example:
async with servicebus_sender: await servicebus_sender.cancel_scheduled_messages(sequence_nums)
-
async
close
() → None¶ Close down the handler connection.
If the handler has already closed, this operation will do nothing. An optional exception can be passed in to indicate that the handler was shutdown due to error.
- Return type
-
async
create_batch
(max_size_in_bytes: Optional[int] = None) → azure.servicebus._common.message.BatchMessage[source]¶ Create a BatchMessage object with the max size of all content being constrained by max_size_in_bytes. The max_size should be no greater than the max allowed message size defined by the service.
- Parameters
max_size_in_bytes (int) – The maximum size of bytes data that a BatchMessage object can hold. By default, the value is determined by your Service Bus tier.
- Return type
Example:
async with servicebus_sender: batch_message = await servicebus_sender.create_batch() batch_message.add(Message("Single message inside batch"))
-
classmethod
from_connection_string
(conn_str: str, **kwargs: Any) → azure.servicebus.aio._servicebus_sender_async.ServiceBusSender[source]¶ Create a ServiceBusSender from a connection string.
- Parameters
conn_str – The connection string of a Service Bus.
- Keyword Arguments
queue_name (str) – The path of specific Service Bus Queue the client connects to.
topic_name (str) – The path of specific Service Bus Topic the client connects to.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.
http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
- Return type
Example:
import os from azure.servicebus.aio import ServiceBusSender servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] queue_sender = ServiceBusSender.from_connection_string( conn_str=servicebus_connection_str, queue_name=queue_name )
-
async
schedule
(messages: Union[Message, List[Message]], schedule_time_utc: datetime.datetime) → List[int][source]¶ Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. :param messages: The message or list of messages to schedule. :type messages: ~azure.servicebus.Message or list[~azure.servicebus.Message] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime :rtype: list[int]
Example:
async with servicebus_sender: scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) scheduled_messages = [Message("Scheduled message") for _ in range(10)] sequence_nums = await servicebus_sender.schedule(scheduled_messages, scheduled_time_utc)
-
async
send
(message: Union[azure.servicebus._common.message.Message, azure.servicebus._common.message.BatchMessage, List[azure.servicebus._common.message.Message]]) → None[source]¶ Sends message and blocks until acknowledgement is received or operation times out.
If a list of messages was provided, attempts to send them as a single batch, throwing a ValueError if they cannot fit in a single batch.
- Parameters
message (Message or BatchMessage or list[Message]) – The ServiceBus message to be sent.
- Return type
- Raises
- class
~azure.servicebus.exceptions.OperationTimeoutError if sending times out.
- class
~azure.servicebus.exceptions.MessageContentTooLarge if the size of the message is over service bus frame size limit.
- class
~azure.servicebus.exceptions.MessageSendFailed if the message fails to send
- class
~azure.servicebus.exceptions.ServiceBusError when other errors happen such as connection error, authentication error, and any unexpected errors. It’s also the top-level root class of above errors.
Example:
async with servicebus_sender: message = Message("Hello World") await servicebus_sender.send(message)
-
class
azure.servicebus.aio.
ServiceBusSession
(session_id: str, receiver: Union[ServiceBusSessionReceiver, ServiceBusSessionReceiverAsync], encoding: str = 'UTF-8')[source]¶ The ServiceBusSession is used for manage session states and lock renewal.
Please use the instance variable `session` on the ServiceBusReceiver to get the corresponding ServiceBusSession object linked with the receiver instead of instantiating a ServiceBusSession object directly.
Example:
async with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session
-
async
get_session_state
() → str[source]¶ Get the session state.
Returns None if no state has been set.
- Return type
Example:
async with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session session_state = await session.get_session_state()
-
async
renew_lock
() → None[source]¶ Renew the session lock.
This operation must be performed periodically in order to retain a lock on the session to continue message processing. Once the lock is lost the connection will be closed. This operation can also be performed as a threaded background task by registering the session with an azure.servicebus.aio.AutoLockRenew instance.
Example:
async with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session session_state = await session.renew_lock()
-
async
set_session_state
(state: Union[str, bytes, bytearray]) → None[source]¶ Set the session state.
Example:
async with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session session_state = await session.set_session_state("START")
-
property
locked_until_utc
¶ The time at which this session’s lock will expire.
- Return type
-
async
-
class
azure.servicebus.aio.
ServiceBusSessionReceiver
(fully_qualified_namespace: str, credential: TokenCredential, **kwargs: Any)[source]¶ The ServiceBusSessionReceiver class defines a high level interface for receiving messages from the Azure Service Bus Queue or Topic Subscription.
- Variables
- Parameters
fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
credential (TokenCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts
ServiceBusSharedKeyCredential
, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.
- Keyword Arguments
queue_name (str) – The path of specific Service Bus Queue the client connects to.
topic_name (str) – The path of specific Service Bus Topic which contains the Subscription the client connects to.
subscription_name (str) – The path of specific Service Bus Subscription under the specified Topic the client connects to.
mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.
session_id (str) – A specific session from which to receive. This must be specified for a sessionful entity, otherwise it must be None. In order to receive messages from the next available session, set this to None. The default is None.
prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0 meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.
idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.
http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
Example:
import os from azure.servicebus.aio import ServiceBusReceiver, ServiceBusSharedKeyCredential fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] queue_receiver = ServiceBusReceiver( fully_qualified_namespace=fully_qualified_namespace, credential=ServiceBusSharedKeyCredential( shared_access_policy, shared_access_key ), queue_name=queue_name )
-
async
close
() → None¶ Close down the handler connection.
If the handler has already closed, this operation will do nothing. An optional exception can be passed in to indicate that the handler was shutdown due to error.
- Return type
-
classmethod
from_connection_string
(conn_str: str, **kwargs: Any) → azure.servicebus.aio._servicebus_session_receiver_async.ServiceBusSessionReceiver[source]¶ Create a ServiceBusSessionReceiver from a connection string.
- Parameters
conn_str – The connection string of a Service Bus.
- Keyword Arguments
queue_name (str) – The path of specific Service Bus Queue the client connects to.
topic_name (str) – The path of specific Service Bus Topic which contains the Subscription the client connects to.
subscription_name (str) – The path of specific Service Bus Subscription under the specified Topic the client connects to.
mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.
session_id (str) – A specific session from which to receive. This must be specified for a sessionful entity, otherwise it must be None. In order to receive messages from the next available session, set this to None. The default is None.
prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0, meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.
idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.
http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
- Return type
Example:
import os from azure.servicebus.aio import ServiceBusReceiver servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] queue_receiver = ServiceBusReceiver.from_connection_string( conn_str=servicebus_connection_str, queue_name=queue_name )
-
async
peek
(message_count=1, sequence_number=0)¶ Browse messages currently pending in the queue.
Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered.
- Parameters
- Return type
Example:
async with servicebus_receiver: messages = await servicebus_receiver.peek() for message in messages: print(message)
-
async
receive
(max_batch_size: Optional[int] = None, max_wait_time: Optional[float] = None) → List[azure.servicebus.aio._async_message.ReceivedMessage]¶ Receive a batch of messages at once.
This approach it optimal if you wish to process multiple messages simultaneously. Note that the number of messages retrieved in a single batch will be dependent on whether prefetch was set for the receiver. This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size.
- Parameters
max_batch_size (int) – Maximum number of messages in the batch. Actual number returned will depend on prefetch size and incoming stream rate.
max_wait_time (float) – Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, an no messages arrive within the timeout period, an empty list will be returned.
- Return type
Example:
async with servicebus_receiver: messages = await servicebus_receiver.receive(max_wait_time=5) for message in messages: print(message) await message.complete()
-
async
receive_deferred_messages
(sequence_numbers: List[int]) → List[azure.servicebus.aio._async_message.ReceivedMessage]¶ Receive messages that have previously been deferred.
When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition.
- Parameters
sequence_numbers (list[int]) – A list of the sequence numbers of messages that have been deferred.
- Return type
Example:
async with servicebus_receiver: deferred_sequenced_numbers = [] messages = await servicebus_receiver.receive(max_wait_time=5) for message in messages: deferred_sequenced_numbers.append(message.sequence_number) print(message) await message.defer() received_deferred_msg = await servicebus_receiver.receive_deferred_messages( sequence_numbers=deferred_sequenced_numbers ) for msg in received_deferred_msg: await msg.complete()
-
property
session
¶ Get the ServiceBusSession object linked with the receiver. Session is only available to session-enabled entities.
- Return type
Example:
async with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session