Azure Service Bus client library for Python¶
Azure Service Bus is a high performance cloud-managed messaging service for providing real-time and fault-tolerant communication between distributed senders and receivers.
Service Bus provides multiple mechanisms for asynchronous highly reliable communication, such as structured first-in-first-out messaging, publish/subscribe capabilities, and the ability to easily scale as your needs grow.
Use the Service Bus client library for Python to communicate between applications and services and implement asynchronous messaging patterns.
Create Service Bus namespaces, queues, topics, and subscriptions, and modify their settings.
Send and receive messages within your Service Bus channels.
Utilize message locks, sessions, and dead letter functionality to implement complex messaging patterns.
Source code | Package (PyPi) | API reference documentation | Product documentation | Samples | Changelog
NOTE: This document has instructions, links and code snippets for the preview of the next version of the
azure-servicebus
package which has different APIs than the current version (0.50). Please view the resources below for references on the existing library.
V0.50 Source code | V0.50 Package (PyPi) | V0.50 API reference documentation | V0.50 Product documentation | V0.50 Samples | V0.50 Changelog
We also provide a migration guide for users familiar with the existing package that would like to try the preview: migration guide to move from Service Bus V0.50 to Service Bus V7 Preview
Getting started¶
Install the package¶
Install the Azure Service Bus client library for Python with pip:
pip install azure-servicebus --pre
Prerequisites:¶
To use this package, you must have:
Azure subscription - Create a free account
Azure Service Bus - Namespace and management credentials
Python 2.7, 3.5 or later - Install Python
If you need an Azure service bus namespace, you can create it via the Azure Portal. If you do not wish to use the graphical portal UI, you can use the Azure CLI via Cloud Shell, or Azure CLI run locally, to create one with this Azure CLI command:
az servicebus namespace create --resource-group <resource-group-name> --name <servicebus-namespace-name> --location <servicebus-namespace-location>
Authenticate the client¶
Interaction with Service Bus starts with an instance of the ServiceBusClient
class. You either need a connection string with SAS key, or a namespace and one of its account keys to instantiate the client object.
Create client from connection string¶
Get credentials: Use the Azure CLI snippet below to populate an environment variable with the service bus connection string (you can also find these values in the Azure Portal by following the step-by-step guide to Get a service bus connection string). The snippet is formatted for the Bash shell.
RES_GROUP=<resource-group-name>
NAMESPACE_NAME=<servicebus-namespace-name>
export SERVICE_BUS_CONN_STR=$(az servicebus namespace authorization-rule keys list --resource-group $RES_GROUP --namespace-name $NAMESPACE_NAME --name RootManageSharedAccessKey --query primaryConnectionString --output tsv)
Once you’ve populated the SERVICE_BUS_CONN_STR
environment variable, you can create the ServiceBusClient
.
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
with ServiceBusClient.from_connection_string(connstr) as client:
...
Create client using the azure-identity library:¶
import os
from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
FULLY_QUALIFIED_NAMESPACE = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE']
with ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential):
...
This constructor takes the fully qualified namespace of your Service Bus instance and a credential that implements the TokenCredential protocol. There are implementations of the
TokenCredential
protocol available in the azure-identity package. The fully qualified namespace is of the format<yournamespace.servicebus.windows.net>
.When using Azure Active Directory, your principal must be assigned a role which allows access to Service Bus, such as the Azure Service Bus Data Owner role. For more information about using Azure Active Directory authorization with Service Bus, please refer to the associated documentation.
Note: client can be initialized without a context manager, but must be manually closed via client.close() to not leak resources.
Key concepts¶
Once you’ve initialized a ServiceBusClient
, you can interact with the primary resource types within a Service Bus Namespace, of which multiple can exist and on which actual message transmission takes place, the namespace often serving as an application container:
Queue: Allows for Sending and Receiving of message. Often used for point-to-point communication.
Topic: As opposed to Queues, Topics are better suited to publish/subscribe scenarios. A topic can be sent to, but requires a subscription, of which there can be multiple in parallel, to consume from.
Subscription: The mechanism to consume from a Topic. Each subscription is independent, and receives a copy of each message sent to the topic. Rules and Filters can be used to tailor which messages are received by a specific subscription.
For more information about these resources, see What is Azure Service Bus?.
To interact with these resources, one should be familiar with the following SDK concepts:
ServiceBusClient: This is the object a user should first initialize to connect to a Service Bus Namespace. To interact with a queue, topic, or subscription, one would spawn a sender or receiver off of this client.
Sender: To send messages to a Queue or Topic, one would use the corresponding
get_queue_sender
orget_topic_sender
method off of aServiceBusClient
instance as seen here.Receiver: To receive messages from a Queue or Subscription, one would use the corresponding
get_queue_receiver
orget_subscription_receiver
method off of aServiceBusClient
instance as seen here.Message: When sending, this is the type you will construct to contain your payload. When receiving, this is where you will access the payload and control how the message is “settled” (completed, dead-lettered, etc); these functions are only available on a received message.
Examples¶
The following sections provide several code snippets covering some of the most common Service Bus tasks, including:
To perform management tasks such as creating and deleting queues/topics/subscriptions, please utilize the azure-mgmt-servicebus library, available here.
Please find further examples in the samples directory demonstrating common Service Bus scenarios such as sending, receiving, session management and message handling.
Send messages to a queue¶
This example sends single message and array of messages to a queue that is assumed to already exist, created via the Azure portal or az commands.
from azure.servicebus import ServiceBusClient, Message
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_sender(queue_name) as sender:
# Sending a single message
single_message = Message("Single message")
sender.send_messages(single_message)
# Sending a list of messages
messages = [Message("First message"), Message("Second message")]
sender.send_messages(messages)
NOTE: A message may be scheduled for delayed delivery using the
ServiceBusSender.schedule_messages()
method, or by specifyingMessage.scheduled_enqueue_time_utc
before callingServiceBusSender.send_messages()
For more detail on scheduling and schedule cancellation please see a sample here.
Receive messages from a queue¶
To receive from a queue, you can either perform an ad-hoc receive via “receiver.receive_messages()” or receive persistently through the receiver itself.
Receive messages from a queue through iterating over ServiceBusReceiver¶
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
# idle_timeout specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(queue_name, idle_timeout=30) as receiver:
for msg in receiver: # ServiceBusReceiver instance is a generator
print(str(msg))
# If it is desired to halt receiving early, one can break out of the loop here safely.
NOTE: Any message received with
mode=PeekLock
(this is the default, with the alternative ReceiveAndDelete removing the message from the queue immediately on receipt) has a lock that must be renewed viamessage.renew_lock()
before it expires if processing would take longer than the lock duration.
See AutoLockRenewer for a helper to perform this in the background automatically. Lock duration is set in Azure on the queue or topic itself.
Receive messages from a queue through ``ServiceBusReceiver.receive_messages()` <https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusReceiver.receive>`_¶
NOTE:
ServiceBusReceiver.receive_messages()
receives a single or constrained list of messages through an ad-hoc method call, as opposed to receiving perpetually from the generator. It always returns a list.
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
received_message_array = receiver.receive_messages(max_wait_time=10) # try to receive a single message within 10 seconds
if received_message_array:
print(str(received_message_array[0]))
with client.get_queue_receiver(queue_name, prefetch=5) as receiver:
received_message_array = receiver.receive_messages(max_batch_size=5, max_wait_time=10) # try to receive maximum 5 messages in a batch within 10 seconds
for message in received_message_array:
print(str(message))
In this example, max_batch_size (and prefetch, as required by max_batch_size) declares the maximum number of messages to attempt receiving before hitting a max_wait_time as specified in seconds.
NOTE: It should also be noted that
ServiceBusReceiver.peek_messages()
is subtly different than receiving, as it does not lock the messages being peeked, and thus they cannot be settled.
Send and receive a message from a session enabled queue¶
Sessions provide first-in-first-out and single-receiver semantics on top of a queue or subscription. While the actual receive syntax is the same, initialization differs slightly.
from azure.servicebus import ServiceBusClient, Message
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_sender(queue_name) as sender:
sender.send_messages(Message("Session Enabled Message", session_id=session_id))
# If session_id is null here, will receive from the first available session.
with client.get_queue_session_receiver(queue_name, session_id) as receiver:
for msg in receiver:
print(str(msg))
NOTE: Messages received from a session do not need their locks renewed like a non-session receiver; instead the lock management occurs at the session level with a session lock that may be renewed with
receiver.session.renew_lock()
Working with topics and subscriptions¶
Topics and subscriptions give an alternative to queues for sending and receiving messages. See documents here for more overarching detail, and of how these differ from queues.
from azure.servicebus import ServiceBusClient, Message
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_topic_sender(topic_name) as sender:
sender.send_messages(Message("Data"))
# If session_id is null here, will receive from the first available session.
with client.get_subscription_receiver(topic_name, subscription_name) as receiver:
for msg in receiver:
print(str(msg))
Settle a message after receipt¶
When receiving from a queue, you have multiple actions you can take on the messages you receive.
NOTE: You can only settle
ReceivedMessage
objects which are received inReceiveSettleMode.PeekLock
mode (this is the default).ReceiveSettleMode.ReceiveAndDelete
mode removes the message from the queue on receipt.PeekMessage
messages returned frompeek()
cannot be settled, as the message lock is not taken like it is in the aforementioned receive methods. Sessionful messages have a similar limitation.
If the message has a lock as mentioned above, settlement will fail if the message lock has expired.
If processing would take longer than the lock duration, it must be maintained via message.renew_lock()
before it expires.
Lock duration is set in Azure on the queue or topic itself.
See AutoLockRenewer for a helper to perform this in the background automatically.
Complete¶
Declares the message processing to be successfully completed, removing the message from the queue.
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
msg.complete()
Abandon¶
Abandon processing of the message for the time being, returning the message immediately back to the queue to be picked up by another (or the same) receiver.
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
msg.abandon()
DeadLetter¶
Transfer the message from the primary queue into a special “dead-letter sub-queue” where it can be accessed using the ServiceBusClient.get_<queue|subscription>_deadletter_receiver
function and consumed from like any other receiver. (see sample here)
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
msg.dead_letter()
Defer¶
Defer is subtly different from the prior settlement methods. It prevents the message from being directly received from the queue
by setting it aside such that it must be received by sequence number in a call to ServiceBusReceiver.receive_deferred_messages
(see sample here)
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
msg.defer()
Troubleshooting¶
Logging¶
Enable
azure.servicebus
logger to collect traces from the library.Enable
uamqp
logger to collect traces from the underlying uAMQP library.Enable AMQP frame level trace by setting
logging_enable=True
when creating the client.
Timeouts¶
There are various timeouts a user should be aware of within the library.
10 minute service side link closure: A link, once opened, will be closed after 10 minutes idle to protect the service against resource leakage. This should largely be transparent to a user, but if you notice a reconnect occurring after such a duration, this is why. Performing any operations, including management operations, on the link will extend this timeout.
idle_timeout: Provided on creation of a receiver, the time after which the underlying UAMQP link will be closed after no traffic. This primarily dictates the length a generator-style receive will run for before exiting if there are no messages. Passing None (default) will wait forever, up until the 10 minute threshold if no other action is taken.
max_wait_time: Provided when calling receive() to fetch a list of messages. Dictates an upper bound for how long the receive() will wait for more messages before returning, similarly up to the aformentioned limits. The “receive()” will return as soon as at least one message is received within the max_wait_time.
NOTE: If processing of a message or session is sufficiently long as to cause timeouts, as an alternative to calling
renew_lock()
manually, one can leverage theAutoLockRenew
functionality detailed below.
AutoLockRenew¶
AutoLockRenew
is a simple method for ensuring your message or session remains locked even over long periods of time, if calling renew_lock()
is impractical or undesired.
Internally, it is not much more than shorthand for creating a concurrent watchdog to call renew_lock()
if the object is nearing expiry.
It should be used as follows:
from azure.servicebus import ServiceBusClient, AutoLockRenew
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']
# Can also be called via "with AutoLockRenew() as renewer" to automate shutdown.
renewer = AutoLockRenew()
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_session_receiver(queue_name, session_id=session_id) as receiver:
renewer.register(receiver.session, timeout=300) # Timeout for how long to maintain the lock for, in seconds.
for msg in receiver.receive_messages():
renewer.register(msg, timeout=60)
# Do your application logic here
msg.complete()
renewer.shutdown()
If for any reason auto-renewal has been interrupted or failed, this can be observed via the auto_renew_error
property on the object being renewed.
It would also manifest when trying to take action (such as completing a message) on the specified object.
Common Exceptions¶
Please view the exceptions reference docs for detailed descriptions of our common Exception types.
Next steps¶
More sample code¶
Please find further examples in the samples directory demonstrating common Service Bus scenarios such as sending, receiving, session management and message handling.
Additional documentation¶
For more extensive documentation on the Service Bus service, see the Service Bus documentation on docs.microsoft.com.
Management capabilities and documentation¶
For users seeking to perform management operations against ServiceBus (Creating a queue/topic/etc, altering filter rules, enumerating entities) please see the azure-mgmt-servicebus documentation for API documentation. Terse usage examples can be found here as well.
Contributing¶
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.