azure.servicebus.aio package

class azure.servicebus.aio.AutoLockRenewer(max_lock_renewal_duration: float = 300, on_lock_renew_failure: Callable[[ServiceBusSession | ServiceBusReceivedMessage, Exception | None], Awaitable[None]] | None = None, loop: AbstractEventLoop | None = None)[source]

Auto lock renew.

An asynchronous AutoLockRenewer handler for renewing the lock tokens of messages and/or sessions in the background.

Parameters:
  • max_lock_renewal_duration (float) – A time in seconds that locks registered to this renewer should be maintained for. Default value is 300 (5 minutes).

  • on_lock_renew_failure (Optional[LockRenewFailureCallback]) – A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback).

Example:

Automatically renew a message lock
from azure.servicebus.aio import AutoLockRenewer

lock_renewal = AutoLockRenewer()
async with servicebus_receiver:
    async for message in servicebus_receiver:
        lock_renewal.register(servicebus_receiver, message, max_lock_renewal_duration=60)
        await process_message(message)
        await servicebus_receiver.complete_message(message)
Automatically renew a session lock
    from azure.servicebus.aio import AutoLockRenewer

    lock_renewal = AutoLockRenewer()
    async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
        # Auto renew session lock for 2 minutes
        lock_renewal.register(receiver, session, max_lock_renewal_duration=120)
        async for message in receiver:
            await process_message(message)
            await receiver.complete_message(message)
async close() None[source]

Cease autorenewal by cancelling any remaining open lock renewal futures.

register(receiver: ServiceBusReceiver, renewable: ServiceBusReceivedMessage | ServiceBusSession, max_lock_renewal_duration: float | None = None, on_lock_renew_failure: Callable[[ServiceBusSession | ServiceBusReceivedMessage, Exception | None], Awaitable[None]] | None = None) None[source]

Register a renewable entity for automatic lock renewal.

Parameters:
  • receiver (ServiceBusReceiver) – The ServiceBusReceiver instance that is associated with the message or the session to be auto-lock-renewed.

  • renewable (Union[ServiceBusReceivedMessage,ServiceBusSession]) – A locked entity that needs to be renewed.

  • max_lock_renewal_duration (Optional[float]) – A time in seconds that the lock should be maintained for. Default value is None. If specified, this value will override the default value specified at the constructor.

  • on_lock_renew_failure (Optional[AsyncLockRenewFailureCallback]) – An async callback may be specified to be called when the lock is lost on the renewable being registered. Default value is None (no callback). :rtype: None

class azure.servicebus.aio.ServiceBusClient(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any)[source]

The ServiceBusClient class defines a high level interface for getting ServiceBusSender and ServiceBusReceiver.

Variables:

fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

Parameters:
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • credential (AsyncTokenCredential or AzureSasCredential or AzureNamedKeyCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method, or alternatively, an AzureSasCredential can be provided too.

Keyword Arguments:
  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.

  • http_proxy (Dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • user_agent (str) – If specified, this will be added in front of the built-in user agent string.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.

  • retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.

  • retry_mode (str) – The delay behavior between retry attempts. Supported values are “fixed” or “exponential”, where default is “exponential”.

  • custom_endpoint_address (str) – The custom endpoint address to use for establishing a connection to the Service Bus service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like “sb://<custom_endpoint_hostname>:<custom_endpoint_port>”. If port is not specified in the custom_endpoint_address, by default port 443 will be used.

  • connection_verify (str) – Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.

  • ssl_context (ssl.SSLContext or None) – The SSLContext object to use in the underlying Pure Python AMQP transport. If specified, connection_verify will be ignored.

  • uamqp_transport (bool) – Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.

Example:

Create a new instance of the ServiceBusClient.
import os
from azure.identity.aio import DefaultAzureCredential
from azure.servicebus.aio import ServiceBusClient

fully_qualified_namespace = os.environ["SERVICEBUS_FULLY_QUALIFIED_NAMESPACE"]
servicebus_client = ServiceBusClient(
    fully_qualified_namespace=fully_qualified_namespace, credential=DefaultAzureCredential()
)
async close() None[source]

Close down the ServiceBus client. All spawned senders, receivers and underlying connection will be shutdown.

Returns:

None

classmethod from_connection_string(conn_str: str, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any) ServiceBusClient[source]

Create a ServiceBusClient from a connection string.

Parameters:

conn_str (str) – The connection string of a Service Bus.

Keyword Arguments:
  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.

  • http_proxy (Dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • user_agent (str) – If specified, this will be added in front of the built-in user agent string.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.

  • retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.

  • retry_mode (str) – The delay behavior between retry attempts. Supported values are ‘fixed’ or ‘exponential’, where default is ‘exponential’.

  • custom_endpoint_address (str) – The custom endpoint address to use for establishing a connection to the Service Bus service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like “sb://<custom_endpoint_hostname>:<custom_endpoint_port>”. If port is not specified in the custom_endpoint_address, by default port 443 will be used.

  • connection_verify (str) – Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.

  • ssl_context (ssl.SSLContext or None) – The SSLContext object to use in the underlying Pure Python AMQP transport. If specified, connection_verify will be ignored.

  • uamqp_transport (bool) – Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.

Returns:

The ServiceBusCLient instance.

Return type:

ServiceBusClient

Example:

Create a new instance of the ServiceBusClient from connection string.
import os
from azure.servicebus.aio import ServiceBusClient

servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
get_queue_receiver(queue_name: str, *, session_id: str | Literal[ServiceBusSessionFilter.NEXT_AVAILABLE] | None = None, sub_queue: ServiceBusSubQueue | str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any) ServiceBusReceiver[source]

Get ServiceBusReceiver for the specific queue.

Parameters:

queue_name (str) – The path of specific Service Bus Queue the client connects to.

Keyword Arguments:
  • session_id (str or NEXT_AVAILABLE_SESSION or None) – A specific session from which to receive. This must be specified for a sessionful queue, otherwise it must be None. In order to receive messages from the next available session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.

  • sub_queue (str or ServiceBusSubQueue or None) – If specified, the subqueue this receiver will connect to. This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can’t be delivered to any receiver or messages that can’t be processed. The default is None, meaning connect to the primary queue. Can be assigned values from ServiceBusSubQueue enum or equivalent string values “deadletter” and “transferdeadletter”.

  • receive_mode (Union[ServiceBusReceiveMode, str]) – The mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PEEK_LOCK.

  • max_wait_time (Optional[float]) – The timeout in seconds to wait for the first and subsequent messages to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. The default value is None, meaning no timeout. On a sessionful queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting. If connection errors are occurring due to write timing out,the connection timeout value may need to be adjusted. See the socket_timeout optional parameter for more details.

  • auto_lock_renewer (Optional[AutoLockRenewer]) – An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead.

  • prefetch_count (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, ServiceBusReceiver.receive_messages would try to cache max_message_count (if provided) within its request to the service. WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used, all prefetched messages will stay in the in-memory prefetch buffer until they’re received into the application. If the application ends before the messages are received into the application, those messages will be lost and unable to be recovered. Therefore, it’s recommended that PEEK_LOCK mode be used with prefetch.

  • client_identifier (str) – A string-based identifier to uniquely identify the receiver instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.

  • socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.

Returns:

The ServiceBusReceiver for the queue.

Return type:

ServiceBusReceiver

Example:

Create a new instance of the ServiceBusSender from ServiceBusClient.
import os
from azure.servicebus.aio import ServiceBusClient

servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
queue_name = os.environ["SERVICEBUS_QUEUE_NAME"]
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
async with servicebus_client:
    queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)
get_queue_sender(queue_name: str, **kwargs: Any) ServiceBusSender[source]

Get ServiceBusSender for the specific queue.

Parameters:

queue_name (str) – The path of specific Service Bus Queue the client connects to.

Keyword Arguments:
  • client_identifier (str) – A string-based identifier to uniquely identify the sender instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.

  • socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.

Returns:

A queue sender.

Return type:

ServiceBusSender

Example:

Create a new instance of the ServiceBusClient from connection string.
import os
from azure.servicebus.aio import ServiceBusClient

servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
queue_name = os.environ["SERVICEBUS_QUEUE_NAME"]
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
async with servicebus_client:
    queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
get_subscription_receiver(topic_name: str, subscription_name: str, *, session_id: str | Literal[ServiceBusSessionFilter.NEXT_AVAILABLE] | None = None, sub_queue: ServiceBusSubQueue | str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any) ServiceBusReceiver[source]

Get ServiceBusReceiver for the specific subscription under the topic.

Parameters:
  • topic_name (str) – The name of specific Service Bus Topic the client connects to.

  • subscription_name (str) – The name of specific Service Bus Subscription under the given Service Bus Topic.

Keyword Arguments:
  • session_id (str or NEXT_AVAILABLE_SESSION or None) – A specific session from which to receive. This must be specified for a sessionful subscription, otherwise it must be None. In order to receive messages from the next available session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.

  • sub_queue (str or ServiceBusSubQueue or None) – If specified, the subqueue this receiver will connect to. This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can’t be delivered to any receiver or messages that can’t be processed. The default is None, meaning connect to the primary queue. Can be assigned values from ServiceBusSubQueue enum or equivalent string values “deadletter” and “transferdeadletter”.

  • receive_mode (Union[ServiceBusReceiveMode, str]) – The mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the subscription. Messages received with RECEIVE_AND_DELETE will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PEEK_LOCK.

  • max_wait_time (Optional[float]) – The timeout in seconds to wait for the first and subsequent messages to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. The default value is None, meaning no timeout. On a sessionful queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting. If connection errors are occurring due to write timing out,the connection timeout value may need to be adjusted. See the socket_timeout optional parameter for more details.

  • auto_lock_renewer (Optional[AutoLockRenewer]) – An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead.

  • prefetch_count (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, ServiceBusReceiver.receive_messages would try to cache max_message_count (if provided) within its request to the service. WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used, all prefetched messages will stay in the in-memory prefetch buffer until they’re received into the application. If the application ends before the messages are received into the application, those messages will be lost and unable to be recovered. Therefore, it’s recommended that PEEK_LOCK mode be used with prefetch.

  • client_identifier (str) – A string-based identifier to uniquely identify the receiver instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.

  • socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.

Returns:

The ServiceBusReceiver for the topic subscription.

Return type:

ServiceBusReceiver

Example:

Create a new instance of the ServiceBusReceiver from ServiceBusClient.
import os
from azure.servicebus.aio import ServiceBusClient

servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
topic_name = os.environ["SERVICEBUS_TOPIC_NAME"]
subscription_name = os.environ["SERVICEBUS_SUBSCRIPTION_NAME"]
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
async with servicebus_client:
    subscription_receiver = servicebus_client.get_subscription_receiver(
        topic_name=topic_name,
        subscription_name=subscription_name,
    )
get_topic_sender(topic_name: str, **kwargs: Any) ServiceBusSender[source]

Get ServiceBusSender for the specific topic.

Parameters:

topic_name (str) – The path of specific Service Bus Topic the client connects to.

Keyword Arguments:
  • client_identifier (str) – A string-based identifier to uniquely identify the sender instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.

  • socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.

Returns:

A topic sender.

Return type:

ServiceBusSender

Example:

Create a new instance of the ServiceBusSender from ServiceBusClient.
import os
from azure.servicebus.aio import ServiceBusClient

servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
topic_name = os.environ["SERVICEBUS_TOPIC_NAME"]
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
async with servicebus_client:
    topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name)
class azure.servicebus.aio.ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)[source]

The ServiceBusReceiver class defines a high level interface for receiving messages from the Azure Service Bus Queue or Topic Subscription.

The two primary channels for message receipt are receive() to make a single request for messages, and async for message in receiver: to continuously receive incoming messages in an ongoing fashion.

Please use the get_<queue/subscription>_receiver method of ~azure.servicebus.aio.ServiceBusClient to create a ServiceBusReceiver instance.

Variables:
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • entity_path (str) – The path of the entity that the client connects to.

Parameters:
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • credential (AsyncTokenCredential or AzureSasCredential or AzureNamedKeyCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method, or alternatively, an AzureSasCredential can be provided too.

Keyword Arguments:
  • queue_name (str) – The path of specific Service Bus Queue the client connects to.

  • topic_name (str) – The path of specific Service Bus Topic which contains the Subscription the client connects to.

  • subscription_name (str) – The path of specific Service Bus Subscription under the specified Topic the client connects to.

  • receive_mode (Union[ServiceBusReceiveMode, str]) – The mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE will be immediately removed from the queue, and cannot be subsequently abandoned or re-received if the client fails to process the message. The default mode is PEEK_LOCK.

  • max_wait_time (Optional[float]) – The timeout in seconds to wait for the first and subsequent messages to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. The default value is None, meaning no timeout. On a sessionful queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting to a session. If connection errors are occurring due to write timing out,the connection timeout value may need to be adjusted. See the socket_timeout optional parameter for more details.

  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.

  • http_proxy (Dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • user_agent (str) – If specified, this will be added in front of the built-in user agent string.

  • auto_lock_renewer (Optional[AutoLockRenewer]) – An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead.

  • prefetch_count (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, ServiceBusReceiver.receive_messages would try to cache max_message_count (if provided) within its request to the service. WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used, all prefetched messages will stay in the in-memory prefetch buffer until they’re received into the application. If the application ends before the messages are received into the application, those messages will be lost and unable to be recovered. Therefore, it’s recommended that PEEK_LOCK mode be used with prefetch.

  • client_identifier (str) – A string-based identifier to uniquely identify the client instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.

  • socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.

async abandon_message(message: ServiceBusReceivedMessage) None[source]

Abandon the message.

This message will be returned to the queue and made available to be received again.

Parameters:

message (ServiceBusReceivedMessage) – The received message to be abandoned.

Return type:

None

Raises:

~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.

Raises:

~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.

Raises:

~azure.servicebus.exceptions.ServiceBusError when errors happen.

Example:

Abandon a received message.
    messages = await servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        await servicebus_receiver.abandon_message(message)
async close() None[source]

Close down the handler connection.

If the handler has already closed, this operation will do nothing. An optional exception can be passed in to indicate that the handler was shutdown due to error.

Return type:

None

async complete_message(message: ServiceBusReceivedMessage) None[source]

Complete the message.

This removes the message from the queue.

Parameters:

message (ServiceBusReceivedMessage) – The received message to be completed.

Return type:

None

Raises:

~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.

Raises:

~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.

Raises:

~azure.servicebus.exceptions.ServiceBusError when errors happen.

Example:

Complete a received message.
    messages = await servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        await servicebus_receiver.complete_message(message)
async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) None[source]

Move the message to the Dead Letter queue.

The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue.

Parameters:
  • message (ServiceBusReceivedMessage) – The received message to be dead-lettered.

  • reason (Optional[str]) – The reason for dead-lettering the message.

  • error_description (Optional[str]) – The detailed error description for dead-lettering the message.

Return type:

None

Raises:

~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.

Raises:

~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.

Raises:

~azure.servicebus.exceptions.ServiceBusError when errors happen.

Example:

Dead letter a received message.
    messages = await servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        await servicebus_receiver.dead_letter_message(message)
async defer_message(message: ServiceBusReceivedMessage) None[source]

Defers the message.

This message will remain in the queue but must be requested specifically by its sequence number in order to be received.

Parameters:

message (ServiceBusReceivedMessage) – The received message to be deferred.

Return type:

None

Raises:

~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.

Raises:

~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.

Raises:

~azure.servicebus.exceptions.ServiceBusError when errors happen.

Example:

Defer a received message.
    messages = await servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        await servicebus_receiver.defer_message(message)
async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) List[ServiceBusReceivedMessage][source]

Browse messages currently pending in the queue.

Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered.

Parameters:

max_message_count (int) – The maximum number of messages to try and peek. The default value is 1.

Keyword Arguments:
  • sequence_number (int) – A message sequence number from which to start browsing messages.

  • timeout (Optional[float]) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Returns:

A list of ~azure.servicebus.ServiceBusReceivedMessage objects.

Return type:

list[ServiceBusReceivedMessage]

Example:

Peek messages in the queue.
async with servicebus_receiver:
    messages = await servicebus_receiver.peek_messages()
    for message in messages:
        print(str(message))
async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) List[ServiceBusReceivedMessage][source]

Receive messages that have previously been deferred.

When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition.

Parameters:

sequence_numbers (Union[int, list[int]]) – A list of the sequence numbers of messages that have been deferred.

Keyword Arguments:

timeout (Optional[float]) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Returns:

A list of the received messages.

Return type:

list[ServiceBusReceivedMessage]

Example:

Receive deferred messages from ServiceBus.
async with servicebus_receiver:
    deferred_sequenced_numbers = []
    messages = await servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        deferred_sequenced_numbers.append(message.sequence_number)
        print(str(message))
        await servicebus_receiver.defer_message(message)

    received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
        sequence_numbers=deferred_sequenced_numbers
    )

    for message in received_deferred_msg:
        await servicebus_receiver.complete_message(message)
async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) List[ServiceBusReceivedMessage][source]

Receive a batch of messages at once.

This approach is optimal if you wish to process multiple messages simultaneously, or perform an ad-hoc receive as a single call.

Note that the number of messages retrieved in a single batch will be dependent on whether prefetch_count was set for the receiver. If prefetch_count is not set for the receiver, the receiver would try to cache max_message_count (if provided) messages within the request to the service.

This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size.

Parameters:
  • max_message_count (Optional[int]) – Maximum number of messages in the batch. Actual number returned will depend on prefetch_count size and incoming stream rate. Setting to None will fully depend on the prefetch config. The default value is 1.

  • max_wait_time (Optional[float]) – Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, and no messages arrive within the timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session. Please use max_wait_time on the constructor to set the timeout for connecting to a session.

Returns:

A list of messages received. If no messages are available, this will be an empty list.

Return type:

list[ServiceBusReceivedMessage]

Example:

Receive messages from ServiceBus.
async with servicebus_receiver:
    messages = await servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        print(str(message))
        await servicebus_receiver.complete_message(message)
async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) datetime[source]

Renew the message lock.

This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed.

In order to complete (or otherwise settle) the message, the lock must be maintained, and cannot already have expired; an expired lock cannot be renewed.

Messages received via RECEIVE_AND_DELETE mode are not locked, and therefore cannot be renewed. This operation is only available for non-sessionful messages as well.

Parameters:

message (ServiceBusReceivedMessage) – The message to renew the lock for.

Keyword Arguments:

timeout (Optional[float]) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Returns:

The utc datetime the lock is set to expire at.

Return type:

datetime.datetime

Raises:

TypeError if the message is sessionful.

Raises:

~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.

Raises:

~azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

Example:

Renew the lock on a received message.
    messages = await servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        await servicebus_receiver.renew_message_lock(message)
property client_identifier: str

Get the ServiceBusReceiver client identifier associated with the receiver instance.

Return type:

str

property session: ServiceBusSession

Get the ServiceBusSession object linked with the receiver. Session is only available to session-enabled entities, it would return None if called on a non-sessionful receiver.

Return type:

ServiceBusSession

Example:

Get session from a receiver
    async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
class azure.servicebus.aio.ServiceBusSender(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, **kwargs: Any)[source]

The ServiceBusSender class defines a high level interface for sending messages to the Azure Service Bus Queue or Topic.

Please use the `get_<queue/topic>_sender` method of ~azure.servicebus.aio.ServiceBusClient to create a ServiceBusSender instance.

Variables:
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • entity_name (str) – The name of the entity that the client connects to.

Parameters:
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • credential (AsyncTokenCredential or AzureSasCredential or AzureNamedKeyCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method, or alternatively, an AzureSasCredential can be provided too.

Keyword Arguments:
  • queue_name (str) – The path of specific Service Bus Queue the client connects to. Only one of queue_name or topic_name can be provided.

  • topic_name (str) – The path of specific Service Bus Topic the client connects to. Only one of queue_name or topic_name can be provided.

  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.

  • http_proxy (Dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • user_agent (str) – If specified, this will be added in front of the built-in user agent string.

  • client_identifier (str) – A string-based identifier to uniquely identify the client instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.

  • socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.

async cancel_scheduled_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) None[source]

Cancel one or more messages that have previously been scheduled and are still pending.

Parameters:

sequence_numbers (int or list[int]) – The sequence numbers of the scheduled messages.

Keyword Arguments:

timeout (float) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Return type:

None

Raises:

~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already cancelled or enqueued.

Example:

Cancelling messages scheduled to be sent in future
async with servicebus_sender:
    await servicebus_sender.cancel_scheduled_messages(sequence_nums)
async close() None

Close down the handler connection.

If the handler has already closed, this operation will do nothing. An optional exception can be passed in to indicate that the handler was shutdown due to error.

Return type:

None

async create_message_batch(max_size_in_bytes: int | None = None) ServiceBusMessageBatch[source]

Create a ServiceBusMessageBatch object with the max size of all content being constrained by max_size_in_bytes. The max_size should be no greater than the max allowed message size defined by the service.

Parameters:

max_size_in_bytes (int or None) – The maximum size of bytes data that a ServiceBusMessageBatch object can hold. By default, the value is determined by your Service Bus tier.

Returns:

ServiceBusMessageBatch object

Return type:

ServiceBusMessageBatch

Example:

Create ServiceBusMessageBatch object within limited size
async with servicebus_sender:
    batch_message = await servicebus_sender.create_message_batch()
    batch_message.add_message(ServiceBusMessage("Single message inside batch"))
async schedule_messages(messages: Mapping[str, Any] | ServiceBusMessage | AmqpAnnotatedMessage | Iterable[Mapping[str, Any]] | Iterable[ServiceBusMessage] | Iterable[AmqpAnnotatedMessage], schedule_time_utc: datetime, *, timeout: float | None = None, **kwargs: Any) List[int][source]

Send Message or multiple Messages to be enqueued at a specific time by the service. Returns a list of the sequence numbers of the enqueued messages.

Parameters:
Keyword Arguments:

timeout (float) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Returns:

The sequence numbers of the enqueued messages.

Return type:

list[int]

Example:

Schedule a message to be sent in future
async with servicebus_sender:
    scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30)
    scheduled_messages = [ServiceBusMessage("Scheduled message") for _ in range(10)]
    sequence_nums = await servicebus_sender.schedule_messages(scheduled_messages, scheduled_time_utc)
async send_messages(message: Mapping[str, Any] | ServiceBusMessage | AmqpAnnotatedMessage | Iterable[Mapping[str, Any]] | Iterable[ServiceBusMessage] | Iterable[AmqpAnnotatedMessage] | ServiceBusMessageBatch, *, timeout: float | None = None, **kwargs: Any) None[source]

Sends message and blocks until acknowledgement is received or operation times out.

If a list of messages was provided, attempts to send them as a single batch, throwing a ValueError if they cannot fit in a single batch.

Parameters:

message (Union[ServiceBusMessage, ServiceBusMessageBatch, AmqpAnnotatedMessage, List[Union[ServiceBusMessage, AmqpAnnotatedMessage]]]) – The ServiceBus message to be sent.

Keyword Arguments:

timeout (Optional[float]) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Return type:

None

Raises:
class:

~azure.servicebus.exceptions.OperationTimeoutError if sending times out.

class:

~azure.servicebus.exceptions.MessageSizeExceededError if the size of the message is over service bus frame size limit.

class:

~azure.servicebus.exceptions.ServiceBusError when other errors happen such as connection error, authentication error, and any unexpected errors. It’s also the top-level root class of above errors.

Example:

Send message.
async with servicebus_sender:
    message_send = ServiceBusMessage("Hello World")
    await servicebus_sender.send_messages(message_send)
property client_identifier: str

Get the ServiceBusSender client identifier associated with the sender instance.

Return type:

str

class azure.servicebus.aio.ServiceBusSession(session_id: str, receiver: ServiceBusReceiver | ServiceBusReceiverAsync)[source]

The ServiceBusSession is used for manage session states and lock renewal.

Please use the property `session` on the ServiceBusReceiver to get the corresponding ServiceBusSession object linked with the receiver instead of instantiating a ServiceBusSession object directly.

Example:

Get session from a receiver
    async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
async get_state(*, timeout: float | None = None, **kwargs: Any) bytes[source]

Get the session state.

Returns None if no state has been set.

Keyword Arguments:

timeout (Optional[float]) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Returns:

The session state.

Return type:

bytes

Example:

Get the session state
    async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
        session_state = await session.get_state()
async renew_lock(*, timeout: float | None = None, **kwargs: Any) datetime[source]

Renew the session lock.

This operation must be performed periodically in order to retain a lock on the session to continue message processing.

Once the lock is lost the connection will be closed; an expired lock cannot be renewed.

This operation can also be performed as a threaded background task by registering the session with an azure.servicebus.aio.AutoLockRenewer instance.

Keyword Arguments:

timeout (Optional[float]) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Returns:

The utc datetime the lock is set to expire at.

Return type:

datetime

Example:

Renew the session lock before it expires
    async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
        await session.renew_lock()
async set_state(state: str | bytes | bytearray | None, *, timeout: float | None = None, **kwargs: Any) None[source]

Set the session state.

Parameters:

state (str or bytes or bytearray or None) – The state value.

Keyword Arguments:

timeout (float or None) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Returns:

Response of callback

Return type:

any

Example:

Set the session state
    async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
        await session.set_state("START")
property locked_until_utc: datetime | None

The time at which this session’s lock will expire.

Return type:

datetime.datetime

property session_id: str

Session id of the current session.

Return type:

str

Subpackages