azure.servicebus package

class azure.servicebus.Message(body, **kwargs)[source]

A Service Bus Message.

Variables
  • properties (MessageProperties) – Properties of the internal AMQP message object.

  • header (MessageHeader) – Header of the internal AMQP message object.

  • message (Message) – Internal AMQP message object.

Parameters

body (str or bytes) – The data to send in a single message.

Keyword Arguments
  • encoding (str) – The encoding for string data. Default is UTF-8.

  • session_id (str) – An optional session ID for the message to be sent.

Example:

Sending a message with additional properties
message = Message("Hello World!!")
message.session_id = "MySessionID"
message.partition_key = "UsingSpecificPartition"
message.user_properties = {'data': 'custom_data'}
message.time_to_live = datetime.timedelta(seconds=30)
property annotations

The annotations of the message.

Return type

dict

property body

The body of the Message.

Return type

bytes or Iterable[bytes]

property enqueue_sequence_number

int

Type

rtype

property partition_key

str

Type

rtype

property scheduled_enqueue_time_utc

Get or set the utc scheduled enqueue time to the message. This property can be used for scheduling when sending a message through ServiceBusSender.send method. If cancelling scheduled messages is required, you should use the ServiceBusSender.schedule method, which returns sequence numbers that can be used for future cancellation. scheduled_enqueue_time_utc is None if not set.

Return type

datetime

property session_id

The session id of the message

Return type

str

property time_to_live

~datetime.timedelta

Type

rtype

property user_properties

User defined properties on the message.

Return type

dict

property via_partition_key

str

Type

rtype

class azure.servicebus.BatchMessage(max_size_in_bytes: Optional[int] = None)[source]

A batch of messages.

Sending messages in a batch is more performant than sending individual message. BatchMessage helps you create the maximum allowed size batch of Message to improve sending performance.

Use the add method to add messages until the maximum batch size limit in bytes has been reached - at which point a ValueError will be raised.

Please use the create_batch method of ServiceBusSender to create a BatchMessage object instead of instantiating a BatchMessage object directly.

Variables
  • max_size_in_bytes (int) – The maximum size of bytes data that a BatchMessage object can hold.

  • message (BatchMessage) – Internal AMQP BatchMessage object.

Parameters

max_size_in_bytes (int) – The maximum size of bytes data that a BatchMessage object can hold.

add(message: azure.servicebus._common.message.Message)None[source]

Try to add a single Message to the batch.

The total size of an added message is the sum of its body, properties, etc. If this added size results in the batch exceeding the maximum batch size, a ValueError will be raised.

Parameters

message (Message) – The Message to be added to the batch.

Return type

None

Raises
class

~azure.servicebus.exceptions.MessageContentTooLarge, when exceeding the size limit.

property size_in_bytes

The combined size of the events in the batch, in bytes.

Return type

int

class azure.servicebus.PeekMessage(message)[source]

A preview message.

This message is still on the queue, and unlocked. A peeked message cannot be completed, abandoned, dead-lettered or deferred. It has no lock token or expiry.

Variables

received_timestamp_utc (datetime.datetime) – The utc timestamp of when the message is received.

property annotations

The annotations of the message.

Return type

dict

property body

The body of the Message.

Return type

bytes or Iterable[bytes]

property enqueue_sequence_number

int

Type

rtype

property enqueued_time_utc

~datetime.datetime

Type

rtype

property partition_id

int

Type

rtype

property partition_key

str

Type

rtype

property scheduled_enqueue_time_utc

Get or set the utc scheduled enqueue time to the message. This property can be used for scheduling when sending a message through ServiceBusSender.send method. If cancelling scheduled messages is required, you should use the ServiceBusSender.schedule method, which returns sequence numbers that can be used for future cancellation. scheduled_enqueue_time_utc is None if not set.

Return type

datetime

property sequence_number

int

Type

rtype

property session_id

The session id of the message

Return type

str

property settled

Whether the message has been settled.

This will aways be True for a message received using ReceiveAndDelete mode, otherwise it will be False until the message is completed or otherwise settled.

Return type

bool

property time_to_live

~datetime.timedelta

Type

rtype

property user_properties

User defined properties on the message.

Return type

dict

property via_partition_key

str

Type

rtype

class azure.servicebus.ReceivedMessage(message, mode=<ReceiveSettleMode.PeekLock: <ReceiverSettleMode.PeekLock: 1>>, **kwargs)[source]

A Service Bus Message received from service side.

Variables

auto_renew_error (AutoLockRenewTimeout or AutoLockRenewFailed) – Error when AutoLockRenew is used and it fails to renew the message lock.

Example:

Checking the properties on a received message.
    messages = servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        print("Receiving: {}".format(message))
        print("Time to live: {}".format(message.time_to_live))
        print("Sequence number: {}".format(message.sequence_number))
        print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number))
        print("Partition ID: {}".format(message.partition_id))
        print("Partition Key: {}".format(message.partition_key))
        print("User Properties: {}".format(message.user_properties))
        print("Annotations: {}".format(message.annotations))
        print("Delivery count: {}".format(message.header.delivery_count))
        print("Message ID: {}".format(message.properties.message_id))
        print("Locked until: {}".format(message.locked_until_utc))
        print("Lock Token: {}".format(message.lock_token))
        print("Enqueued time: {}".format(message.enqueued_time_utc))
abandon()None[source]

Abandon the message.

This message will be returned to the queue and made available to be received again.

Return type

None

Raises

~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.

Raises

~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.

Raises

~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.

Raises

~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.

Example:

Abandoning a received message to return it immediately to the queue.
    messages = servicebus_receiver.receive(max_wait_time=5)
    for message in messages:
        message.abandon()
complete()None[source]

Complete the message.

This removes the message from the queue.

Return type

None

Raises

~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.

Raises

~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.

Raises

~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.

Raises

~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.

Example:

Completing a received message to remove it from the queue.
with servicebus_receiver:
    messages = servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        print(message)
        message.complete()
dead_letter(reason: Optional[str] = None, description: Optional[str] = None)None[source]

Move the message to the Dead Letter queue.

The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue.

Parameters
  • reason (str) – The reason for dead-lettering the message.

  • description (str) – The detailed description for dead-lettering the message.

Return type

None

Raises

~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.

Raises

~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.

Raises

~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.

Raises

~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.

Example:

Dead letter a message to remove it from the queue by sending it to the dead letter subqueue, and receiving it from there.
    with servicebus_client.get_queue_receiver(queue_name) as servicebus_receiver:
        messages = servicebus_receiver.receive(max_wait_time=5)
        for message in messages:
            message.dead_letter(reason='reason for dead lettering', description='description for dead lettering')

    with servicebus_client.get_queue_deadletter_receiver(queue_name) as servicebus_deadletter_receiver:
        messages = servicebus_deadletter_receiver.receive(max_wait_time=5)
        for message in messages:
            message.complete()
defer()None[source]

Defer the message.

This message will remain in the queue but must be requested specifically by its sequence number in order to be received.

Return type

None

Raises

~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.

Raises

~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.

Raises

~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.

Raises

~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.

Example:

Deferring a received message sets it aside such that it can only be received by calling receive_deffered_messages with its sequence number
with servicebus_receiver:
    deferred_sequenced_numbers = []
    messages = servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        deferred_sequenced_numbers.append(message.sequence_number)
        print(message)
        message.defer()

    received_deferred_msg = servicebus_receiver.receive_deferred_messages(
        sequence_numbers=deferred_sequenced_numbers
    )

    for msg in received_deferred_msg:
        msg.complete()
renew_lock()None[source]

Renew the message lock.

This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed.

In order to complete (or otherwise settle) the message, the lock must be maintained, and cannot already have expired; an expired lock cannot be renewed.

Messages received via ReceiveAndDelete mode are not locked, and therefore cannot be renewed. This operation is only available for non-sessionful messages as well.

Lock renewal can be performed as a background task by registering the message with an azure.servicebus.AutoLockRenew instance.

Return type

None

Raises

TypeError if the message is sessionful.

Raises

~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired.

Raises

~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.

property annotations

The annotations of the message.

Return type

dict

property body

The body of the Message.

Return type

bytes or Iterable[bytes]

property enqueue_sequence_number

int

Type

rtype

property enqueued_time_utc

~datetime.datetime

Type

rtype

property expired

bool

Type

rtype

property lock_token

~uuid.UUID or str

Type

rtype

property locked_until_utc

datetime.datetime

Type

rtype

property partition_id

int

Type

rtype

property partition_key

str

Type

rtype

property scheduled_enqueue_time_utc

Get or set the utc scheduled enqueue time to the message. This property can be used for scheduling when sending a message through ServiceBusSender.send method. If cancelling scheduled messages is required, you should use the ServiceBusSender.schedule method, which returns sequence numbers that can be used for future cancellation. scheduled_enqueue_time_utc is None if not set.

Return type

datetime

property sequence_number

int

Type

rtype

property session_id

The session id of the message

Return type

str

property settled

Whether the message has been settled.

This will aways be True for a message received using ReceiveAndDelete mode, otherwise it will be False until the message is completed or otherwise settled.

Return type

bool

property time_to_live

~datetime.timedelta

Type

rtype

property user_properties

User defined properties on the message.

Return type

dict

property via_partition_key

str

Type

rtype

class azure.servicebus.ReceiveSettleMode[source]

An enumeration.

PeekLock = <ReceiverSettleMode.PeekLock: 1>
ReceiveAndDelete = <ReceiverSettleMode.ReceiveAndDelete: 0>
class azure.servicebus.ServiceBusClient(fully_qualified_namespace: str, credential: TokenCredential, **kwargs: Any)[source]

The ServiceBusClient class defines a high level interface for getting ServiceBusSender and ServiceBusReceiver.

Variables

fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

Parameters
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • credential (TokenCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts ServiceBusSharedKeyCredential, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.

Keyword Arguments
  • entity_name (str) – Optional entity name, this can be the name of Queue or Topic. It must be specified if the credential is for specific Queue or Topic.

  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

Example:

Create a new instance of the ServiceBusClient.
import os
from azure.servicebus import ServiceBusClient, ServiceBusSharedKeyCredential
fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE']
shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY']
shared_access_key = os.environ['SERVICE_BUS_SAS_KEY']
servicebus_client = ServiceBusClient(
    fully_qualified_namespace=fully_qualified_namespace,
    credential=ServiceBusSharedKeyCredential(
        shared_access_policy,
        shared_access_key
    )
)
close()None[source]

Close down the ServiceBus client and the underlying connection.

Returns

None

classmethod from_connection_string(conn_str: str, **kwargs: Any) → azure.servicebus._servicebus_client.ServiceBusClient[source]

Create a ServiceBusClient from a connection string.

Parameters

conn_str (str) – The connection string of a Service Bus.

Keyword Arguments
  • entity_name (str) – Optional entity name, this can be the name of Queue or Topic. It must be specified if the credential is for specific Queue or Topic.

  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

Return type

ServiceBusClient

Example:

Create a new instance of the ServiceBusClient from connection string.
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
get_queue_deadletter_receiver(queue_name: str, **kwargs: Any) → azure.servicebus._servicebus_receiver.ServiceBusReceiver[source]
Get ServiceBusReceiver for the dead-letter queue which is the secondary subqueue provided by

the specific Queue, it holds messages that can’t be delivered to any receiver or messages that can’t be processed.

Parameters

queue_name (str) – The path of specific Service Bus Queue the client connects to.

Keyword Arguments
  • mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.

  • idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.

  • retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.

  • transfer_deadletter (bool) – Whether to connect to the transfer dead-letter queue, or the standard dead-letter queue. The transfer dead-letter queue holds messages that have failed to be transferred in ForwardTo or SendVia scenarios. Default is False, using the standard dead-letter endpoint.

  • prefetch (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch being 0, ServiceBusReceiver.receive would try to cache max_batch_size (if provided) within its request to the service.

Return type

ServiceBusReceiver

Example:

Create a new instance of the ServiceBusReceiver for Dead Letter Queue from ServiceBusClient.
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
    queue_receiver = servicebus_client.get_queue_deadletter_receiver(queue_name=queue_name)
get_queue_receiver(queue_name: str, **kwargs: Any) → azure.servicebus._servicebus_receiver.ServiceBusReceiver[source]

Get ServiceBusReceiver for the specific queue.

Parameters

queue_name (str) – The path of specific Service Bus Queue the client connects to.

Keyword Arguments
  • mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.

  • idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • prefetch (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch being 0, ServiceBusReceiver.receive would try to cache max_batch_size (if provided) within its request to the service.

Return type

ServiceBusReceiver

Example:

Create a new instance of the ServiceBusReceiver from ServiceBusClient.
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
    queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)
get_queue_sender(queue_name: str, **kwargs: Any) → azure.servicebus._servicebus_sender.ServiceBusSender[source]

Get ServiceBusSender for the specific queue.

Parameters

queue_name (str) – The path of specific Service Bus Queue the client connects to.

Keyword Arguments

retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

Return type

ServiceBusSender

Example:

Create a new instance of the ServiceBusSender from ServiceBusClient.
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
    queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
get_queue_session_receiver(queue_name: str, session_id: Optional[str] = None, **kwargs: Any) → azure.servicebus._servicebus_session_receiver.ServiceBusSessionReceiver[source]

Get ServiceBusSessionReceiver for the specific queue.

Parameters
  • queue_name (str) – The path of specific Service Bus Queue the client connects to.

  • session_id (str) – A specific session from which to receive. This must be specified for a sessionful entity, otherwise it must be None. In order to receive messages from the next available session, set this to None. The default is None.

Keyword Arguments
  • mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.

  • idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • prefetch (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch being 0, ServiceBusReceiver.receive would try to cache max_batch_size (if provided) within its request to the service.

Return type

ServiceBusSessionReceiver

Example:

Create a new instance of the ServiceBusReceiver from ServiceBusClient.
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
    queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)
get_subscription_deadletter_receiver(topic_name: str, subscription_name: str, **kwargs: Any) → azure.servicebus._servicebus_receiver.ServiceBusReceiver[source]
Get ServiceBusReceiver for the dead-letter queue which is the secondary subqueue provided by

the specific topic subscription, it holds messages that can’t be delivered to any receiver or messages that can’t be processed.

Parameters
  • topic_name (str) – The name of specific Service Bus Topic the client connects to.

  • subscription_name (str) – The name of specific Service Bus Subscription under the given Service Bus Topic.

Keyword Arguments
  • mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.

  • idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.

  • retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.

  • transfer_deadletter (bool) – Whether to connect to the transfer dead-letter queue, or the standard dead-letter queue. The transfer dead letter queue holds messages that have failed to be transferred in ForwardTo or SendVia scenarios. Default is False, using the standard dead-letter endpoint.

  • prefetch (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch being 0, ServiceBusReceiver.receive would try to cache max_batch_size (if provided) within its request to the service.

Return type

ServiceBusReceiver

Example:

Create a new instance of the ServiceBusReceiver for Dead Letter Queue from ServiceBusClient.
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ["SERVICE_BUS_TOPIC_NAME"]
subscription_name = os.environ["SERVICE_BUS_SUBSCRIPTION_NAME"]
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
    subscription_receiver = servicebus_client.get_subscription_deadletter_receiver(
        topic_name=topic_name,
        subscription_name=subscription_name,
    )
get_subscription_receiver(topic_name: str, subscription_name: str, **kwargs: Any) → azure.servicebus._servicebus_receiver.ServiceBusReceiver[source]

Get ServiceBusReceiver for the specific subscription under the topic.

Parameters
  • topic_name (str) – The name of specific Service Bus Topic the client connects to.

  • subscription_name (str) – The name of specific Service Bus Subscription under the given Service Bus Topic.

Keyword Arguments
  • mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.

  • idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.

  • retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.

  • prefetch (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch being 0, ServiceBusReceiver.receive would try to cache max_batch_size (if provided) within its request to the service.

Return type

ServiceBusReceiver

Example:

Create a new instance of the ServiceBusReceiver from ServiceBusClient.
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ["SERVICE_BUS_TOPIC_NAME"]
subscription_name = os.environ["SERVICE_BUS_SUBSCRIPTION_NAME"]
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
    subscription_receiver = servicebus_client.get_subscription_receiver(
        topic_name=topic_name,
        subscription_name=subscription_name,
    )
get_subscription_session_receiver(topic_name: str, subscription_name: str, session_id: Optional[str] = None, **kwargs: Any) → azure.servicebus._servicebus_receiver.ServiceBusReceiver[source]

Get ServiceBusReceiver for the specific subscription under the topic.

Parameters
  • topic_name (str) – The name of specific Service Bus Topic the client connects to.

  • subscription_name (str) – The name of specific Service Bus Subscription under the given Service Bus Topic.

  • session_id (str) – A specific session from which to receive. This must be specified for a sessionful entity, otherwise it must be None. In order to receive messages from the next available session, set this to None. The default is None.

Keyword Arguments
  • mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PeekLock.

  • idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.

  • retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.

  • prefetch (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch being 0, ServiceBusReceiver.receive would try to cache max_batch_size (if provided) within its request to the service.

Return type

ServiceBusSessionReceiver

Example:

Create a new instance of the ServiceBusReceiver from ServiceBusClient.
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ["SERVICE_BUS_TOPIC_NAME"]
subscription_name = os.environ["SERVICE_BUS_SUBSCRIPTION_NAME"]
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
    subscription_receiver = servicebus_client.get_subscription_receiver(
        topic_name=topic_name,
        subscription_name=subscription_name,
    )
get_topic_sender(topic_name: str, **kwargs: Any) → azure.servicebus._servicebus_sender.ServiceBusSender[source]

Get ServiceBusSender for the specific topic.

Parameters

topic_name (str) – The path of specific Service Bus Topic the client connects to.

Keyword Arguments
  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.

  • retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.

Return type

ServiceBusSender

Example:

Create a new instance of the ServiceBusSender from ServiceBusClient.
import os
from azure.servicebus import ServiceBusClient
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
with servicebus_client:
    topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name)
class azure.servicebus.ServiceBusReceiver(fully_qualified_namespace: str, credential: TokenCredential, **kwargs: Any)[source]

The ServiceBusReceiver class defines a high level interface for receiving messages from the Azure Service Bus Queue or Topic Subscription.

The two primary channels for message receipt are receive() to make a single request for messages, and for message in receiver: to continuously receive incoming messages in an ongoing fashion.

Variables
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • entity_path (str) – The path of the entity that the client connects to.

Parameters
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • credential (TokenCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts ServiceBusSharedKeyCredential, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.

Keyword Arguments
  • queue_name (str) – The path of specific Service Bus Queue the client connects to.

  • topic_name (str) – The path of specific Service Bus Topic which contains the Subscription the client connects to.

  • subscription_name (str) – The path of specific Service Bus Subscription under the specified Topic the client connects to.

  • idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.

  • mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently abandoned or re-received if the client fails to process the message. The default mode is PeekLock.

  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • prefetch (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch being 0, ServiceBusReceiver.receive would try to cache max_batch_size (if provided) within its request to the service.

Example:

Create a new instance of the ServiceBusReceiver.
import os
from azure.servicebus import ServiceBusReceiver, ServiceBusSharedKeyCredential
fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE']
shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY']
shared_access_key = os.environ['SERVICE_BUS_SAS_KEY']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
queue_receiver = ServiceBusReceiver(
    fully_qualified_namespace=fully_qualified_namespace,
    credential=ServiceBusSharedKeyCredential(
        shared_access_policy,
        shared_access_key
    ),
    queue_name=queue_name
)
close()None

Close down the handler links (and connection if the handler uses a separate connection).

If the handler has already closed, this operation will do nothing.

Return type

None

classmethod from_connection_string(conn_str: str, **kwargs: Any) → azure.servicebus._servicebus_receiver.ServiceBusReceiver[source]

Create a ServiceBusReceiver from a connection string.

Parameters

conn_str – The connection string of a Service Bus.

Keyword Arguments
  • queue_name (str) – The path of specific Service Bus Queue the client connects to.

  • topic_name (str) – The path of specific Service Bus Topic which contains the Subscription the client connects to.

  • subscription_name (str) – The path of specific Service Bus Subscription under the specified Topic the client connects to.

  • mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently abandoned or re-received if the client fails to process the message. The default mode is PeekLock.

  • idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.

  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

  • prefetch (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch being 0, ServiceBusReceiver.receive would try to cache max_batch_size (if provided) within its request to the service.

Return type

ServiceBusReceiver

Example:

Create a new instance of the ServiceBusReceiver from connection string.
import os
from azure.servicebus import ServiceBusReceiver
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
queue_receiver = ServiceBusReceiver.from_connection_string(
    conn_str=servicebus_connection_str,
    queue_name=queue_name
)
next()
peek_messages(message_count: int = 1, sequence_number: Optional[int] = None) → List[azure.servicebus._common.message.PeekMessage][source]

Browse messages currently pending in the queue.

Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered.

Parameters
  • message_count (int) – The maximum number of messages to try and peek. The default value is 1.

  • sequence_number (int) – A message sequence number from which to start browsing messages.

Return type

list[PeekMessage]

Example:

Look at pending messages in the queue.
with servicebus_receiver:
    messages = servicebus_receiver.peek_messages()
    for message in messages:
        print(message)
receive_deferred_messages(sequence_numbers: List[int]) → List[azure.servicebus._common.message.ReceivedMessage][source]

Receive messages that have previously been deferred.

When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition.

Parameters

sequence_numbers (list[int]) – A list of the sequence numbers of messages that have been deferred.

Return type

list[ReceivedMessage]

Example:

Receive deferred messages from ServiceBus.
with servicebus_receiver:
    deferred_sequenced_numbers = []
    messages = servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        deferred_sequenced_numbers.append(message.sequence_number)
        print(message)
        message.defer()

    received_deferred_msg = servicebus_receiver.receive_deferred_messages(
        sequence_numbers=deferred_sequenced_numbers
    )

    for msg in received_deferred_msg:
        msg.complete()
receive_messages(max_batch_size: Optional[int] = None, max_wait_time: Optional[float] = None) → List[azure.servicebus._common.message.ReceivedMessage][source]

Receive a batch of messages at once.

This approach is optimal if you wish to process multiple messages simultaneously, or perform an ad-hoc receive as a single call.

Note that the number of messages retrieved in a single batch will be dependent on whether prefetch was set for the receiver. If prefetch is not set for the receiver, the receiver would try to cache max_batch_size (if provided) messages within the request to the service.

This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size.

Parameters
  • max_batch_size (int) – Maximum number of messages in the batch. Actual number returned will depend on prefetch size and incoming stream rate.

  • max_wait_time (float) – Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, an no messages arrive within the timeout period, an empty list will be returned.

Return type

list[ReceivedMessage]

Example:

Receive messages from ServiceBus.
with servicebus_receiver:
    messages = servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        print(message)
        message.complete()
class azure.servicebus.ServiceBusSessionReceiver(fully_qualified_namespace, credential, **kwargs)[source]

The ServiceBusSessionReceiver class defines a high level interface for receiving messages from the Azure Service Bus Queue or Topic Subscription while utilizing a session for FIFO and ownership semantics.

The two primary channels for message receipt are receive() to make a single request for messages, and for message in receiver: to continuously receive incoming messages in an ongoing fashion.

Variables
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • entity_path (str) – The path of the entity that the client connects to.

Parameters
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • credential (TokenCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts ServiceBusSharedKeyCredential, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.

Keyword Arguments
  • queue_name (str) – The path of specific Service Bus Queue the client connects to.

  • topic_name (str) – The path of specific Service Bus Topic which contains the Subscription the client connects to.

  • subscription_name (str) – The path of specific Service Bus Subscription under the specified Topic the client connects to.

  • prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0, meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.

  • idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.

  • mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently abandoned or re-received if the client fails to process the message. The default mode is PeekLock.

  • session_id (str) – A specific session from which to receive. This must be specified for a sessionful entity, otherwise it must be None. In order to receive messages from the next available session, set this to None. The default is None.

  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

Example:

Create a new instance of the ServiceBusReceiver.
import os
from azure.servicebus import ServiceBusReceiver, ServiceBusSharedKeyCredential
fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE']
shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY']
shared_access_key = os.environ['SERVICE_BUS_SAS_KEY']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
queue_receiver = ServiceBusReceiver(
    fully_qualified_namespace=fully_qualified_namespace,
    credential=ServiceBusSharedKeyCredential(
        shared_access_policy,
        shared_access_key
    ),
    queue_name=queue_name
)
close()None

Close down the handler links (and connection if the handler uses a separate connection).

If the handler has already closed, this operation will do nothing.

Return type

None

classmethod from_connection_string(conn_str: str, **kwargs: Any) → azure.servicebus._servicebus_session_receiver.ServiceBusSessionReceiver[source]

Create a ServiceBusSessionReceiver from a connection string.

Parameters

conn_str – The connection string of a Service Bus.

Keyword Arguments
  • queue_name (str) – The path of specific Service Bus Queue the client connects to.

  • topic_name (str) – The path of specific Service Bus Topic which contains the Subscription the client connects to.

  • subscription_name (str) – The path of specific Service Bus Subscription under the specified Topic the client connects to.

  • mode (ReceiveSettleMode) – The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete will be immediately removed from the queue, and cannot be subsequently abandoned or re-received if the client fails to process the message. The default mode is PeekLock.

  • session_id (str) – A specific session from which to receive. This must be specified for a sessionful entity, otherwise it must be None. In order to receive messages from the next available session, set this to None. The default is None.

  • prefetch (int) – The maximum number of messages to cache with each request to the service. The default value is 0, meaning messages will be received from the service and processed one at a time. Increasing this value will improve message throughput performance but increase the change that messages will expire while they are cached if they’re not processed fast enough.

  • idle_timeout (float) – The timeout in seconds between received messages after which the receiver will automatically shutdown. The default value is 0, meaning no timeout.

  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

Return type

ServiceBusSessionReceiver

Example:

Create a new instance of the ServiceBusReceiver from connection string.
import os
from azure.servicebus import ServiceBusReceiver
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
queue_receiver = ServiceBusReceiver.from_connection_string(
    conn_str=servicebus_connection_str,
    queue_name=queue_name
)
next()
peek_messages(message_count: int = 1, sequence_number: Optional[int] = None) → List[azure.servicebus._common.message.PeekMessage]

Browse messages currently pending in the queue.

Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered.

Parameters
  • message_count (int) – The maximum number of messages to try and peek. The default value is 1.

  • sequence_number (int) – A message sequence number from which to start browsing messages.

Return type

list[PeekMessage]

Example:

Look at pending messages in the queue.
with servicebus_receiver:
    messages = servicebus_receiver.peek_messages()
    for message in messages:
        print(message)
receive_deferred_messages(sequence_numbers: List[int]) → List[azure.servicebus._common.message.ReceivedMessage]

Receive messages that have previously been deferred.

When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition.

Parameters

sequence_numbers (list[int]) – A list of the sequence numbers of messages that have been deferred.

Return type

list[ReceivedMessage]

Example:

Receive deferred messages from ServiceBus.
with servicebus_receiver:
    deferred_sequenced_numbers = []
    messages = servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        deferred_sequenced_numbers.append(message.sequence_number)
        print(message)
        message.defer()

    received_deferred_msg = servicebus_receiver.receive_deferred_messages(
        sequence_numbers=deferred_sequenced_numbers
    )

    for msg in received_deferred_msg:
        msg.complete()
receive_messages(max_batch_size: Optional[int] = None, max_wait_time: Optional[float] = None) → List[azure.servicebus._common.message.ReceivedMessage]

Receive a batch of messages at once.

This approach is optimal if you wish to process multiple messages simultaneously, or perform an ad-hoc receive as a single call.

Note that the number of messages retrieved in a single batch will be dependent on whether prefetch was set for the receiver. If prefetch is not set for the receiver, the receiver would try to cache max_batch_size (if provided) messages within the request to the service.

This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size.

Parameters
  • max_batch_size (int) – Maximum number of messages in the batch. Actual number returned will depend on prefetch size and incoming stream rate.

  • max_wait_time (float) – Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, an no messages arrive within the timeout period, an empty list will be returned.

Return type

list[ReceivedMessage]

Example:

Receive messages from ServiceBus.
with servicebus_receiver:
    messages = servicebus_receiver.receive_messages(max_wait_time=5)
    for message in messages:
        print(message)
        message.complete()
property session

Get the ServiceBusSession object linked with the receiver. Session is only available to session-enabled entities.

Return type

ServiceBusSession

Example:

Get session from a receiver
    with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
class azure.servicebus.ServiceBusSession(session_id: str, receiver: Union[ServiceBusSessionReceiver, ServiceBusSessionReceiverAsync], encoding: str = 'UTF-8')[source]

The ServiceBusSession is used for manage session states and lock renewal.

Please use the instance variable `session` on the ServiceBusReceiver to get the corresponding ServiceBusSession object linked with the receiver instead of instantiating a ServiceBusSession object directly.

Variables

auto_renew_error (AutoLockRenewTimeout or AutoLockRenewFailed) – Error when AutoLockRenew is used and it fails to renew the session lock.

Example:

Get session from a receiver
    with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
get_session_state()str[source]

Get the session state.

Returns None if no state has been set.

Return type

str

Example:

Get the session state
    with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
        session_state = session.get_session_state()
renew_lock()None[source]

Renew the session lock.

This operation must be performed periodically in order to retain a lock on the session to continue message processing.

Once the lock is lost the connection will be closed; an expired lock cannot be renewed.

This operation can also be performed as a threaded background task by registering the session with an azure.servicebus.AutoLockRenew instance.

Example:

Renew the session lock before it expires
    with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
        session_state = session.renew_lock()
set_session_state(state: Union[str, bytes, bytearray])None[source]

Set the session state.

Parameters

state (str, bytes or bytearray) – The state value.

Example:

Set the session state
    with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
        session_state = session.set_session_state("START")
property expired

Whether the receivers lock on a particular session has expired.

Return type

bool

property locked_until_utc

The time at which this session’s lock will expire.

Return type

datetime.datetime

property session_id

Session id of the current session.

Return type

str

class azure.servicebus.ServiceBusSender(fully_qualified_namespace: str, credential: TokenCredential, **kwargs: Any)[source]

The ServiceBusSender class defines a high level interface for sending messages to the Azure Service Bus Queue or Topic.

Variables
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • entity_name (str) – The name of the entity that the client connects to.

Parameters
  • fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.

  • credential (TokenCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts ServiceBusSharedKeyCredential, or credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method.

Keyword Arguments
  • queue_name (str) – The path of specific Service Bus Queue the client connects to.

  • topic_name (str) – The path of specific Service Bus Topic the client connects to.

  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

Example:

Create a new instance of the ServiceBusSender.
import os
from azure.servicebus import ServiceBusSender, ServiceBusSharedKeyCredential
fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE']
shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY']
shared_access_key = os.environ['SERVICE_BUS_SAS_KEY']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
queue_sender = ServiceBusSender(
    fully_qualified_namespace=fully_qualified_namespace,
    credential=ServiceBusSharedKeyCredential(
        shared_access_policy,
        shared_access_key
    ),
    queue_name=queue_name
)
cancel_scheduled_messages(sequence_numbers: Union[int, List[int]])None[source]

Cancel one or more messages that have previously been scheduled and are still pending.

Parameters

sequence_numbers (int or list[int]) – The sequence numbers of the scheduled messages.

Return type

None

Raises

~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already cancelled or enqueued.

Example:

Cancelling messages scheduled to be sent in future
with servicebus_sender:
    servicebus_sender.cancel_scheduled_messages(sequence_nums)
close()None

Close down the handler links (and connection if the handler uses a separate connection).

If the handler has already closed, this operation will do nothing.

Return type

None

create_batch(max_size_in_bytes: Optional[int] = None) → azure.servicebus._common.message.BatchMessage[source]

Create a BatchMessage object with the max size of all content being constrained by max_size_in_bytes. The max_size should be no greater than the max allowed message size defined by the service.

Parameters

max_size_in_bytes (int) – The maximum size of bytes data that a BatchMessage object can hold. By default, the value is determined by your Service Bus tier.

Return type

BatchMessage

Example:

Create BatchMessage object within limited size
with servicebus_sender:
    batch_message = servicebus_sender.create_batch()
    batch_message.add(Message("Single message inside batch"))
classmethod from_connection_string(conn_str: str, **kwargs: Any) → azure.servicebus._servicebus_sender.ServiceBusSender[source]

Create a ServiceBusSender from a connection string.

Parameters

conn_str – The connection string of a Service Bus.

Keyword Arguments
  • queue_name (str) – The path of specific Service Bus Queue the client connects to. Only one of queue_name or topic_name can be provided.

  • topic_name (str) – The path of specific Service Bus Topic the client connects to. Only one of queue_name or topic_name can be provided.

  • logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.

  • retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

  • transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.

  • http_proxy (dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.

Return type

ServiceBusSenderClient

Example:

Create a new instance of the ServiceBusSender from connection string.
import os
from azure.servicebus import ServiceBusSender
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
queue_sender = ServiceBusSender.from_connection_string(
    conn_str=servicebus_connection_str,
    queue_name=queue_name
)
schedule_messages(messages: Union[Message, List[Message]], schedule_time_utc: datetime.datetime) → List[int][source]

Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. :param messages: The message or list of messages to schedule. :type messages: ~azure.servicebus.Message or list[~azure.servicebus.Message] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime :rtype: list[int]

Example:

Schedule a message to be sent in future
with servicebus_sender:
    scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30)
    scheduled_messages = [Message("Scheduled message") for _ in range(10)]
    sequence_nums = servicebus_sender.schedule_messages(scheduled_messages, scheduled_time_utc)
send_messages(message: Union[azure.servicebus._common.message.Message, azure.servicebus._common.message.BatchMessage, List[azure.servicebus._common.message.Message]])None[source]

Sends message and blocks until acknowledgement is received or operation times out.

If a list of messages was provided, attempts to send them as a single batch, throwing a ValueError if they cannot fit in a single batch.

Parameters

message (Message or BatchMessage or list[Message]) – The ServiceBus message to be sent.

Return type

None

Raises
class

~azure.servicebus.exceptions.OperationTimeoutError if sending times out.

class

~azure.servicebus.exceptions.MessageContentTooLarge if the size of the message is over service bus frame size limit.

class

~azure.servicebus.exceptions.MessageSendFailed if the message fails to send

class

~azure.servicebus.exceptions.ServiceBusError when other errors happen such as connection error, authentication error, and any unexpected errors. It’s also the top-level root class of above errors.

Example:

Send message.
with servicebus_sender:
    message = Message("Hello World")
    servicebus_sender.send_messages(message)
class azure.servicebus.ServiceBusSharedKeyCredential(policy: str, key: str)[source]

The shared access key credential used for authentication.

Parameters
  • policy (str) – The name of the shared access policy.

  • key (str) – The shared access key.

get_token(*scopes: str, **kwargs: Any) → azure.servicebus._base_handler.AccessToken[source]
class azure.servicebus.TransportType[source]

Transport type The underlying transport protocol type:

Amqp: AMQP over the default TCP transport protocol, it uses port 5671. AmqpOverWebsocket: Amqp over the Web Sockets transport protocol, it uses port 443.

Amqp = 1
AmqpOverWebsocket = 2
class azure.servicebus.AutoLockRenew(executor=None, max_workers=None)[source]

Auto renew locks for messages and sessions using a background thread pool.

Parameters
  • executor (ThreadPoolExecutor) – A user-specified thread pool. This cannot be combined with setting max_workers.

  • max_workers (int) – Specify the maximum workers in the thread pool. If not specified the number used will be derived from the core count of the environment. This cannot be combined with executor.

Example:

Automatically renew a message lock
from azure.servicebus import AutoLockRenew
lock_renewal = AutoLockRenew(max_workers=4)
with servicebus_receiver:
    for message in servicebus_receiver:
        # Auto renew message for 1 minute.
        lock_renewal.register(message, timeout=60)
        process_message(message)
        message.complete()
Automatically renew a session lock
    from azure.servicebus import AutoLockRenew

    lock_renewal = AutoLockRenew(max_workers=4)
    with servicebus_client.get_queue_session_receiver(queue_name=queue_name, session_id=session_id) as receiver:
        session = receiver.session
        # Auto renew session lock for 2 minutes
        lock_renewal.register(session, timeout=120)
        for message in receiver:
            process_message(message)
            message.complete()
register(renewable, timeout=300)[source]

Register a renewable entity for automatic lock renewal.

Parameters
  • renewable (ReceivedMessage or Session) – A locked entity that needs to be renewed.

  • timeout (float) – A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes).

shutdown(wait=True)[source]

Shutdown the thread pool to clean up any remaining lock renewal threads.

Parameters

wait (bool) – Whether to block until thread pool has shutdown. Default is True.

Submodules

azure.servicebus.exceptions module

exception azure.servicebus.exceptions.AutoLockRenewFailed(message, inner_exception=None)[source]

An attempt to renew a lock on a message or session in the background has failed.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.AutoLockRenewTimeout(message, inner_exception=None)[source]

The time allocated to renew the message or session lock has elapsed.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.MessageAlreadySettled(action)[source]

Failed to settle the message.

An attempt was made to complete an operation on a message that has already been settled (completed, abandoned, dead-lettered or deferred). This error will also be raised if an attempt is made to settle a message received via ReceiveAndDelete mode.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.MessageContentTooLarge(message, inner_exception=None)[source]

Message content is larger than the service bus frame size

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.MessageError(message, inner_exception=None)[source]

A message failed to send because the message is in a wrong state

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.MessageLockExpired(message=None, inner_exception=None)[source]

The lock on the message has expired and it has been released back to the queue.

It will need to be received again in order to settle it.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.MessageSendFailed(inner_exception)[source]

A message failed to send to the Service Bus entity.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.MessageSettleFailed(action, inner_exception)[source]

Attempt to settle a message failed.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.NoActiveSession(message, inner_exception=None)[source]

No active Sessions are available to receive from.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.OperationTimeoutError(message, inner_exception=None)[source]

Operation timed out.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.ServiceBusAuthenticationError(message, inner_exception=None)[source]

An error occured when authenticate the connection.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.ServiceBusAuthorizationError(message, inner_exception=None)[source]

An error occured when authorizing the connection.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.ServiceBusConnectionError(message, inner_exception=None)[source]

An error occured in the connection.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.ServiceBusError(message, inner_exception=None)[source]

An error occured.

This is the parent of all Service Bus errors and can be used for default error handling.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.ServiceBusResourceNotFound(message, inner_exception=None)[source]

The Service Bus entity could not be reached.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception azure.servicebus.exceptions.SessionLockExpired(message=None, inner_exception=None)[source]

The lock on the session has expired.

All unsettled messages that have been received can no longer be settled.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args