azure.servicebus package¶
-
class
azure.servicebus.
AutoLockRenewer
(max_lock_renewal_duration: float = 300, on_lock_renew_failure: Optional[LockRenewFailureCallback] = None, executor: Optional[ThreadPoolExecutor] = None, max_workers: Optional[int] = None)[source]¶ Auto renew locks for messages and sessions using a background thread pool.
- Parameters
max_lock_renewal_duration (float) – A time in seconds that locks registered to this renewer should be maintained for. Default value is 300 (5 minutes).
on_lock_renew_failure (Optional[LockRenewFailureCallback]) – A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback).
executor (Optional[ThreadPoolExecutor]) – A user-specified thread pool. This cannot be combined with setting max_workers.
max_workers (Optional[int]) – Specify the maximum workers in the thread pool. If not specified the number used will be derived from the core count of the environment. This cannot be combined with executor.
Example:
Automatically renew a message lock¶from azure.servicebus import AutoLockRenewer lock_renewal = AutoLockRenewer(max_workers=4) with servicebus_receiver: for message in servicebus_receiver: # Auto renew message for 1 minute. lock_renewal.register(servicebus_receiver, message, max_lock_renewal_duration=60) process_message(message) servicebus_receiver.complete_message(message)
Automatically renew a session lock¶from azure.servicebus import AutoLockRenewer lock_renewal = AutoLockRenewer(max_workers=4) with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session # Auto renew session lock for 2 minutes lock_renewal.register(receiver, session, max_lock_renewal_duration=120) for message in receiver: process_message(message) receiver.complete_message(message)
Auto renew locks for messages and sessions using a background thread pool. It is recommended setting max_worker to a large number or passing ThreadPoolExecutor of large max_workers number when AutoLockRenewer is supposed to deal with multiple messages or sessions simultaneously.
- Parameters
max_lock_renewal_duration (float) – A time in seconds that locks registered to this renewer should be maintained for. Default value is 300 (5 minutes).
on_lock_renew_failure (Optional[LockRenewFailureCallback]) – A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback).
executor (Optional[ThreadPoolExecutor]) – A user-specified thread pool. This cannot be combined with setting max_workers.
max_workers (Optional[int]) – Specify the maximum workers in the thread pool. If not specified the number used will be derived from the core count of the environment. This cannot be combined with executor.
-
close
(wait=True)[source]¶ Cease autorenewal by shutting down the thread pool to clean up any remaining lock renewal threads.
-
register
(receiver: ServiceBusReceiver, renewable: Renewable, max_lock_renewal_duration: Optional[float] = None, on_lock_renew_failure: Optional[LockRenewFailureCallback] = None) → None[source]¶ Register a renewable entity for automatic lock renewal.
- Parameters
receiver (ServiceBusReceiver) – The ServiceBusReceiver instance that is associated with the message or the session to be auto-lock-renewed.
renewable (Union[ServiceBusReceivedMessage, ServiceBusSession]) – A locked entity that needs to be renewed.
max_lock_renewal_duration (Optional[float]) – A time in seconds that the lock should be maintained for. Default value is None. If specified, this value will override the default value specified at the constructor.
on_lock_renew_failure (Optional[LockRenewFailureCallback]) – A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback).
- Return type
-
class
azure.servicebus.
ServiceBusClient
(fully_qualified_namespace: str, credential: Union[TokenCredential, AzureSasCredential, AzureNamedKeyCredential], *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any)[source]¶ The ServiceBusClient class defines a high level interface for getting ServiceBusSender and ServiceBusReceiver.
- Variables
fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
- Parameters
fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
credential (TokenCredential or AzureSasCredential or AzureNamedKeyCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method, or alternatively, an AzureSasCredential can be provided too.
- Keyword Arguments
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.
http_proxy (Dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
user_agent (str) – If specified, this will be added in front of the built-in user agent string.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.
retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.
retry_mode (str) – The delay behavior between retry attempts. Supported values are “fixed” or “exponential”, where default is “exponential”.
custom_endpoint_address (str) – The custom endpoint address to use for establishing a connection to the Service Bus service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like “sb://<custom_endpoint_hostname>:<custom_endpoint_port>”. If port is not specified in the custom_endpoint_address, by default port 443 will be used.
connection_verify (str) – Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.
uamqp_transport (bool) – Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.
Example:
Create a new instance of the ServiceBusClient.¶import os from azure.identity import DefaultAzureCredential from azure.servicebus import ServiceBusClient fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE'] servicebus_client = ServiceBusClient( fully_qualified_namespace=fully_qualified_namespace, credential=DefaultAzureCredential() )
-
close
() → None[source]¶ Close down the ServiceBus client. All spawned senders, receivers and underlying connection will be shutdown.
- Returns
None
-
classmethod
from_connection_string
(conn_str: str, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any) → azure.servicebus._servicebus_client.ServiceBusClient[source]¶ Create a ServiceBusClient from a connection string.
- Parameters
conn_str (str) – The connection string of a Service Bus.
- Keyword Arguments
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.
http_proxy (Dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
user_agent (str) – If specified, this will be added in front of the built-in user agent string.
retry_total (int) – The total number of attempts to redo a failed operation when an error occurs. Default value is 3.
retry_backoff_factor (float) – Delta back-off internal in the unit of second between retries. Default value is 0.8.
retry_backoff_max (float) – Maximum back-off interval in the unit of second. Default value is 120.
retry_mode (str) – The delay behavior between retry attempts. Supported values are ‘fixed’ or ‘exponential’, where default is ‘exponential’.
custom_endpoint_address (str) – The custom endpoint address to use for establishing a connection to the Service Bus service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like “sb://<custom_endpoint_hostname>:<custom_endpoint_port>”. If port is not specified in the custom_endpoint_address, by default port 443 will be used.
connection_verify (str) – Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.
uamqp_transport (bool) – Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.
- Return type
Example:
Create a new instance of the ServiceBusClient from connection string.¶import os from azure.servicebus import ServiceBusClient servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
-
get_queue_receiver
(queue_name: str, *, session_id: Optional[Union[str, typing_extensions.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>]]] = None, sub_queue: Optional[Union[azure.servicebus._common.constants.ServiceBusSubQueue, str]] = None, receive_mode: Union[azure.servicebus._common.constants.ServiceBusReceiveMode, str] = <ServiceBusReceiveMode.PEEK_LOCK: 'peeklock'>, max_wait_time: Optional[float] = None, auto_lock_renewer: Optional[azure.servicebus._common.auto_lock_renewer.AutoLockRenewer] = None, prefetch_count: int = 0, **kwargs: Any) → azure.servicebus._servicebus_receiver.ServiceBusReceiver[source]¶ Get ServiceBusReceiver for the specific queue.
- Parameters
queue_name (str) – The path of specific Service Bus Queue the client connects to.
- Keyword Arguments
session_id (str or NEXT_AVAILABLE_SESSION) – A specific session from which to receive. This must be specified for a sessionful queue, otherwise it must be None. In order to receive messages from the next available session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.
sub_queue (str or ServiceBusSubQueue or None) – If specified, the subqueue this receiver will connect to. This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can’t be delivered to any receiver or messages that can’t be processed. The default is None, meaning connect to the primary queue. Can be assigned values from ServiceBusSubQueue enum or equivalent string values “deadletter” and “transferdeadletter”.
receive_mode (Union[ServiceBusReceiveMode, str]) – The receive_mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default receive_mode is PEEK_LOCK.
max_wait_time (Optional[float]) – The timeout in seconds between received messages after which the receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See the socket_timeout optional parameter for more details.
auto_lock_renewer (Optional[AutoLockRenewer]) – An ~azure.servicebus.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead.
prefetch_count (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, ServiceBusReceiver.receive would try to cache max_message_count (if provided) within its request to the service.
client_identifier (str) – A string-based identifier to uniquely identify the receiver instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.
socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.
- Return type
Example:
Create a new instance of the ServiceBusReceiver from ServiceBusClient.¶import os from azure.servicebus import ServiceBusClient servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR'] queue_name = os.environ['SERVICEBUS_QUEUE_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) with servicebus_client: queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)
-
get_queue_sender
(queue_name: str, **kwargs: Any) → azure.servicebus._servicebus_sender.ServiceBusSender[source]¶ Get ServiceBusSender for the specific queue.
- Parameters
queue_name (str) – The path of specific Service Bus Queue the client connects to.
- Keyword Arguments
client_identifier (str) – A string-based identifier to uniquely identify the sender instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.
socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.
- Returns
A queue Sender.
- Return type
Example:
Create a new instance of the ServiceBusSender from ServiceBusClient.¶import os from azure.servicebus import ServiceBusClient servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR'] queue_name = os.environ['SERVICEBUS_QUEUE_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) with servicebus_client: queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
-
get_subscription_receiver
(topic_name: str, subscription_name: str, *, session_id: Optional[Union[str, typing_extensions.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>]]] = None, sub_queue: Optional[Union[azure.servicebus._common.constants.ServiceBusSubQueue, str]] = None, receive_mode: Union[azure.servicebus._common.constants.ServiceBusReceiveMode, str] = <ServiceBusReceiveMode.PEEK_LOCK: 'peeklock'>, max_wait_time: Optional[float] = None, auto_lock_renewer: Optional[azure.servicebus._common.auto_lock_renewer.AutoLockRenewer] = None, prefetch_count: int = 0, **kwargs: Any) → azure.servicebus._servicebus_receiver.ServiceBusReceiver[source]¶ Get ServiceBusReceiver for the specific subscription under the topic.
- Parameters
- Keyword Arguments
session_id (str or NEXT_AVAILABLE_SESSION) – A specific session from which to receive. This must be specified for a sessionful subscription, otherwise it must be None. In order to receive messages from the next available session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.
sub_queue (str or ServiceBusSubQueue or None) – If specified, the subqueue this receiver will connect to. This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can’t be delivered to any receiver or messages that can’t be processed. The default is None, meaning connect to the primary queue. Can be assigned values from ServiceBusSubQueue enum or equivalent string values “deadletter” and “transferdeadletter”.
receive_mode (Union[ServiceBusReceiveMode, str]) – The receive_mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the subscription. Messages received with RECEIVE_AND_DELETE will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if the client fails to process the message. The default receive_mode is PEEK_LOCK.
max_wait_time (Optional[float]) – The timeout in seconds between received messages after which the receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See the socket_timeout optional parameter for more details.
auto_lock_renewer (Optional[AutoLockRenewer]) – An ~azure.servicebus.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead.
prefetch_count (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, ServiceBusReceiver.receive would try to cache max_message_count (if provided) within its request to the service.
client_identifier (str) – A string-based identifier to uniquely identify the receiver instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.
socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.
- Return type
Example:
Create a new instance of the ServiceBusReceiver from ServiceBusClient.¶import os from azure.servicebus import ServiceBusClient servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR'] topic_name = os.environ["SERVICEBUS_TOPIC_NAME"] subscription_name = os.environ["SERVICEBUS_SUBSCRIPTION_NAME"] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) with servicebus_client: subscription_receiver = servicebus_client.get_subscription_receiver( topic_name=topic_name, subscription_name=subscription_name, )
-
get_topic_sender
(topic_name: str, **kwargs: Any) → azure.servicebus._servicebus_sender.ServiceBusSender[source]¶ Get ServiceBusSender for the specific topic.
- Parameters
topic_name (str) – The path of specific Service Bus Topic the client connects to.
- Keyword Arguments
client_identifier (str) – A string-based identifier to uniquely identify the sender instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.
socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.
- Returns
A topic sender.
- Return type
Example:
Create a new instance of the ServiceBusSender from ServiceBusClient.¶import os from azure.servicebus import ServiceBusClient servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR'] topic_name = os.environ['SERVICEBUS_TOPIC_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) with servicebus_client: topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name)
-
class
azure.servicebus.
ServiceBusConnectionStringProperties
(**kwargs)[source]¶ Properties of a connection string.
-
values
() → List¶
-
property
endpoint
¶ //<FQDN>/ :rtype: str
- Type
The endpoint for the Service Bus resource. In the format sb
-
property
entity_path
¶ Optional. Represents the name of the queue/topic. :rtype: str
-
property
fully_qualified_namespace
¶ The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net. :rtype: str
The shared_access_key can be used along with the shared_access_key_name as a credential. :rtype: str
The name of the shared_access_key. This must be used along with the shared_access_key. :rtype: str
This can be provided instead of the shared_access_key_name and the shared_access_key. :rtype: str
-
-
class
azure.servicebus.
ServiceBusMessage
(body: Optional[Union[str, bytes]], *, application_properties: Optional[Dict[Union[str, bytes], Union[int, float, bytes, bool, str, uuid.UUID]]] = None, session_id: Optional[str] = None, message_id: Optional[str] = None, scheduled_enqueue_time_utc: Optional[datetime.datetime] = None, time_to_live: Optional[datetime.timedelta] = None, content_type: Optional[str] = None, correlation_id: Optional[str] = None, subject: Optional[str] = None, partition_key: Optional[str] = None, to: Optional[str] = None, reply_to: Optional[str] = None, reply_to_session_id: Optional[str] = None, **kwargs: Any)[source]¶ A Service Bus Message.
- Parameters
body (Optional[Union[str, bytes]]) – The data to send in a single message.
- Keyword Arguments
application_properties (Dict[str, Union[int or float or bool or bytes or str or uuid.UUID or datetime or None]]) – The user defined properties on the message.
session_id (Optional[str]) – The session identifier of the message for a sessionful entity.
message_id (Optional[str]) – The id to identify the message.
scheduled_enqueue_time_utc (Optional[datetime.datetime]) – The utc scheduled enqueue time to the message.
time_to_live (Optional[datetime.timedelta]) – The life duration of a message.
content_type (Optional[str]) – The content type descriptor.
correlation_id (Optional[str]) – The correlation identifier.
subject (Optional[str]) – The application specific subject, sometimes referred to as label.
partition_key (Optional[str]) – The partition key for sending a message to a partitioned entity.
to (Optional[str]) – The to address used for auto_forward chaining scenarios.
reply_to (Optional[str]) – The address of an entity to send replies to.
reply_to_session_id (Optional[str]) – The session identifier augmenting the reply_to address.
Example:
Sending a message with additional properties¶message = ServiceBusMessage( "Hello World!!", session_id="MySessionID", application_properties={'data': 'custom_data'}, time_to_live=datetime.timedelta(seconds=30), label='MyLabel' )
-
property
application_properties
¶ The user defined properties on the message.
-
property
body
¶ The body of the Message. The format may vary depending on the body type: For
azure.servicebus.amqp.AmqpMessageBodyType.DATA
, the body could be bytes or Iterable[bytes]. Forazure.servicebus.amqp.AmqpMessageBodyType.SEQUENCE
, the body could be List or Iterable[List]. Forazure.servicebus.amqp.AmqpMessageBodyType.VALUE
, the body could be any type.- Return type
Any
-
property
body_type
¶ The body type of the underlying AMQP message.
- Return type
-
property
content_type
¶ The content type descriptor.
Optionally describes the payload of the message, with a descriptor following the format of RFC2045, Section 5, for example “application/json”.
-
property
correlation_id
¶ The correlation identifier.
Allows an application to specify a context for the message for the purposes of correlation, for example reflecting the MessageId of a message that is being replied to.
See Message Routing and Correlation in https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation.
-
property
message
¶ - Get the underlying uamqp.Message or LegacyMessage.
This is deprecated and will be removed in a later release.
- Return type
uamqp.Message or LegacyMessage
- Type
DEPRECATED
-
property
message_id
¶ The id to identify the message.
The message identifier is an application-defined value that uniquely identifies the message and its payload. The identifier is a free-form string and can reflect a GUID or an identifier derived from the application context. If enabled, the duplicate detection (see https://docs.microsoft.com/azure/service-bus-messaging/duplicate-detection) feature identifies and removes second and further submissions of messages with the same message id.
-
property
partition_key
¶ The partition key for sending a message to a partitioned entity.
Setting this value enables assigning related messages to the same internal partition, so that submission sequence order is correctly recorded. The partition is chosen by a hash function over this value and cannot be chosen directly.
See Partitioned queues and topics in https://docs.microsoft.com/azure/service-bus-messaging/service-bus-partitioning.
-
property
raw_amqp_message
¶ Advanced usage only. The internal AMQP message payload that is sent or received. :rtype: ~azure.servicebus.amqp.AmqpAnnotatedMessage
-
property
reply_to
¶ The address of an entity to send replies to.
This optional and application-defined value is a standard way to express a reply path to the receiver of the message. When a sender expects a reply, it sets the value to the absolute or relative path of the queue or topic it expects the reply to be sent to.
See Message Routing and Correlation in https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation.
-
property
reply_to_session_id
¶ The session identifier augmenting the reply_to address.
This value augments the reply_to information and specifies which session id should be set for the reply when sent to the reply entity.
See Message Routing and Correlation in https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation.
-
property
scheduled_enqueue_time_utc
¶ The utc scheduled enqueue time to the message.
This property can be used for scheduling when sending a message through ServiceBusSender.send method. If cancelling scheduled messages is required, you should use the ServiceBusSender.schedule method, which returns sequence numbers that can be used for future cancellation. scheduled_enqueue_time_utc is None if not set.
- Return type
-
property
session_id
¶ The session identifier of the message for a sessionful entity.
For sessionful entities, this application-defined value specifies the session affiliation of the message. Messages with the same session identifier are subject to summary locking and enable exact in-order processing and demultiplexing. For non-sessionful entities, this value is ignored.
See Message Sessions in https://docs.microsoft.com/azure/service-bus-messaging/message-sessions.
-
property
subject
¶ The application specific subject, sometimes referred to as a label.
This property enables the application to indicate the purpose of the message to the receiver in a standardized fashion, similar to an email subject line.
- Return type
-
property
time_to_live
¶ The life duration of a message.
This value is the relative duration after which the message expires, starting from the instant the message has been accepted and stored by the broker, as captured in enqueued_time_utc. When not set explicitly, the assumed value is the DefaultTimeToLive for the respective queue or topic. A message-level time-to-live value cannot be longer than the entity’s time-to-live setting and it is silently adjusted if it does.
See Expiration in https://docs.microsoft.com/azure/service-bus-messaging/message-expiration
- Return type
-
property
to
¶ The to address.
This property is reserved for future use in routing scenarios and presently ignored by the broker itself. Applications can use this value in rule-driven auto-forward chaining scenarios to indicate the intended logical destination of the message.
See https://docs.microsoft.com/azure/service-bus-messaging/service-bus-auto-forwarding for more details.
-
class
azure.servicebus.
ServiceBusMessageBatch
(max_size_in_bytes: Optional[int] = None, **kwargs: Any)[source]¶ A batch of messages.
Sending messages in a batch is more performant than sending individual message. ServiceBusMessageBatch helps you create the maximum allowed size batch of Message to improve sending performance.
Use the add method to add messages until the maximum batch size limit in bytes has been reached - at which point a MessageSizeExceededError will be raised.
Please use the create_message_batch method of ServiceBusSender to create a ServiceBusMessageBatch object instead of instantiating a ServiceBusMessageBatch object directly.
- Parameters
max_size_in_bytes (Optional[int]) – The maximum size of bytes data that a ServiceBusMessageBatch object can hold.
-
add_message
(message: Union[azure.servicebus._common.message.ServiceBusMessage, azure.servicebus.amqp._amqp_message.AmqpAnnotatedMessage, Mapping[str, Any]]) → None[source]¶ Try to add a single Message to the batch.
The total size of an added message is the sum of its body, properties, etc. If this added size results in the batch exceeding the maximum batch size, a MessageSizeExceededError will be raised.
- Parameters
message (Union[ServiceBusMessage, AmqpAnnotatedMessage]) – The Message to be added to the batch.
- Raises
- class
~azure.servicebus.exceptions.MessageSizeExceededError, when exceeding the size limit.
-
property
max_size_in_bytes
¶ The maximum size of bytes data that a ServiceBusMessageBatch object can hold.
- Return type
-
property
message
¶ - Get the underlying uamqp.BatchMessage or LegacyBatchMessage.
This is deprecated and will be removed in a later release.
- Return type
BatchMessage or LegacyBatchMessage
- Type
DEPRECATED
-
class
azure.servicebus.
ServiceBusMessageState
(value)[source]¶ An enumeration.
-
ACTIVE
= 0¶
-
DEFERRED
= 1¶
-
SCHEDULED
= 2¶
-
-
class
azure.servicebus.
ServiceBusReceiveMode
(value)[source]¶ An enumeration.
-
PEEK_LOCK
= 'peeklock'¶
-
RECEIVE_AND_DELETE
= 'receiveanddelete'¶
-
-
class
azure.servicebus.
ServiceBusReceivedMessage
(message: Union['Message', 'pyamqp_Message'], receive_mode: Union[ServiceBusReceiveMode, str] = <ServiceBusReceiveMode.PEEK_LOCK: 'peeklock'>, frame: Optional['TransferFrame'] = None, **kwargs)[source]¶ A Service Bus Message received from service side.
- Variables
auto_renew_error (AutoLockRenewTimeout or AutoLockRenewFailed) – Error when AutoLockRenewer is used and it fails to renew the message lock.
Example:
Checking the properties on a received message.¶messages = servicebus_receiver.receive_messages(max_wait_time=5) for message in messages: print("Receiving: {}".format(message)) print("Time to live: {}".format(message.time_to_live)) print("Sequence number: {}".format(message.sequence_number)) print("Enqueued Sequence number: {}".format(message.enqueued_sequence_number)) print("Partition Key: {}".format(message.partition_key)) print("Application Properties: {}".format(message.application_properties)) print("Delivery count: {}".format(message.delivery_count)) print("Message ID: {}".format(message.message_id)) print("Locked until: {}".format(message.locked_until_utc)) print("Lock Token: {}".format(message.lock_token)) print("Enqueued time: {}".format(message.enqueued_time_utc))
-
property
application_properties
¶ The user defined properties on the message.
-
property
body
¶ The body of the Message. The format may vary depending on the body type: For
azure.servicebus.amqp.AmqpMessageBodyType.DATA
, the body could be bytes or Iterable[bytes]. Forazure.servicebus.amqp.AmqpMessageBodyType.SEQUENCE
, the body could be List or Iterable[List]. Forazure.servicebus.amqp.AmqpMessageBodyType.VALUE
, the body could be any type.- Return type
Any
-
property
body_type
¶ The body type of the underlying AMQP message.
- Return type
-
property
content_type
¶ The content type descriptor.
Optionally describes the payload of the message, with a descriptor following the format of RFC2045, Section 5, for example “application/json”.
-
property
correlation_id
¶ The correlation identifier.
Allows an application to specify a context for the message for the purposes of correlation, for example reflecting the MessageId of a message that is being replied to.
See Message Routing and Correlation in https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation.
-
property
dead_letter_error_description
¶ Dead letter error description, when the message is received from a deadletter subqueue of an entity.
- Return type
-
property
dead_letter_reason
¶ Dead letter reason, when the message is received from a deadletter subqueue of an entity.
- Return type
-
property
dead_letter_source
¶ The name of the queue or subscription that this message was enqueued on, before it was deadlettered. This property is only set in messages that have been dead-lettered and subsequently auto-forwarded from the dead-letter queue to another entity. Indicates the entity in which the message was dead-lettered.
- Return type
-
property
delivery_count
¶ Number of deliveries that have been attempted for this message. The count is incremented when a message lock expires or the message is explicitly abandoned by the receiver.
- Return type
-
property
enqueued_sequence_number
¶ For messages that have been auto-forwarded, this property reflects the sequence number that had first been assigned to the message at its original point of submission.
- Return type
-
property
enqueued_time_utc
¶ The UTC datetime at which the message has been accepted and stored in the entity.
- Return type
-
property
expires_at_utc
¶ The UTC datetime at which the message is marked for removal and no longer available for retrieval from the entity due to expiration. Expiry is controlled by the Message.time_to_live property. This property is computed from Message.enqueued_time_utc + Message.time_to_live.
- Return type
-
property
lock_token
¶ The lock token for the current message serving as a reference to the lock that is being held by the broker in PEEK_LOCK mode.
-
property
locked_until_utc
¶ The UTC datetime until which the message will be locked in the queue/subscription. When the lock expires, delivery count of the message is incremented and the message is again available for retrieval.
- Return type
-
property
message
¶ - Get the underlying LegacyMessage.
This is deprecated and will be removed in a later release.
- Return type
LegacyMessage
- Type
DEPRECATED
-
property
message_id
¶ The id to identify the message.
The message identifier is an application-defined value that uniquely identifies the message and its payload. The identifier is a free-form string and can reflect a GUID or an identifier derived from the application context. If enabled, the duplicate detection (see https://docs.microsoft.com/azure/service-bus-messaging/duplicate-detection) feature identifies and removes second and further submissions of messages with the same message id.
-
property
partition_key
¶ The partition key for sending a message to a partitioned entity.
Setting this value enables assigning related messages to the same internal partition, so that submission sequence order is correctly recorded. The partition is chosen by a hash function over this value and cannot be chosen directly.
See Partitioned queues and topics in https://docs.microsoft.com/azure/service-bus-messaging/service-bus-partitioning.
-
property
raw_amqp_message
¶ Advanced usage only. The internal AMQP message payload that is sent or received. :rtype: ~azure.servicebus.amqp.AmqpAnnotatedMessage
-
property
reply_to
¶ The address of an entity to send replies to.
This optional and application-defined value is a standard way to express a reply path to the receiver of the message. When a sender expects a reply, it sets the value to the absolute or relative path of the queue or topic it expects the reply to be sent to.
See Message Routing and Correlation in https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation.
-
property
reply_to_session_id
¶ The session identifier augmenting the reply_to address.
This value augments the reply_to information and specifies which session id should be set for the reply when sent to the reply entity.
See Message Routing and Correlation in https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation.
-
property
scheduled_enqueue_time_utc
¶ The utc scheduled enqueue time to the message.
This property can be used for scheduling when sending a message through ServiceBusSender.send method. If cancelling scheduled messages is required, you should use the ServiceBusSender.schedule method, which returns sequence numbers that can be used for future cancellation. scheduled_enqueue_time_utc is None if not set.
- Return type
-
property
sequence_number
¶ The unique number assigned to a message by Service Bus. The sequence number is a unique 64-bit integer assigned to a message as it is accepted and stored by the broker and functions as its true identifier. For partitioned entities, the topmost 16 bits reflect the partition identifier. Sequence numbers monotonically increase. They roll over to 0 when the 48-64 bit range is exhausted.
- Return type
-
property
session_id
¶ The session identifier of the message for a sessionful entity.
For sessionful entities, this application-defined value specifies the session affiliation of the message. Messages with the same session identifier are subject to summary locking and enable exact in-order processing and demultiplexing. For non-sessionful entities, this value is ignored.
See Message Sessions in https://docs.microsoft.com/azure/service-bus-messaging/message-sessions.
-
property
state
¶ Defaults to Active. Represents the message state of the message. Can be Active, Deferred. or Scheduled.
- Return type
-
property
subject
¶ The application specific subject, sometimes referred to as a label.
This property enables the application to indicate the purpose of the message to the receiver in a standardized fashion, similar to an email subject line.
- Return type
-
property
time_to_live
¶ The life duration of a message.
This value is the relative duration after which the message expires, starting from the instant the message has been accepted and stored by the broker, as captured in enqueued_time_utc. When not set explicitly, the assumed value is the DefaultTimeToLive for the respective queue or topic. A message-level time-to-live value cannot be longer than the entity’s time-to-live setting and it is silently adjusted if it does.
See Expiration in https://docs.microsoft.com/azure/service-bus-messaging/message-expiration
- Return type
-
property
to
¶ The to address.
This property is reserved for future use in routing scenarios and presently ignored by the broker itself. Applications can use this value in rule-driven auto-forward chaining scenarios to indicate the intended logical destination of the message.
See https://docs.microsoft.com/azure/service-bus-messaging/service-bus-auto-forwarding for more details.
-
class
azure.servicebus.
ServiceBusReceiver
(fully_qualified_namespace: str, credential: Union[TokenCredential, AzureSasCredential, AzureNamedKeyCredential], *, queue_name: Optional[str] = None, topic_name: Optional[str] = None, subscription_name: Optional[str] = None, receive_mode: Union[azure.servicebus._common.constants.ServiceBusReceiveMode, str] = <ServiceBusReceiveMode.PEEK_LOCK: 'peeklock'>, max_wait_time: Optional[float] = None, auto_lock_renewer: Optional[AutoLockRenewer] = None, prefetch_count: int = 0, **kwargs: Any)[source]¶ The ServiceBusReceiver class defines a high level interface for receiving messages from the Azure Service Bus Queue or Topic Subscription.
The two primary channels for message receipt are receive() to make a single request for messages, and for message in receiver: to continuously receive incoming messages in an ongoing fashion.
Please use the `get_<queue/subscription>_receiver` method of ~azure.servicebus.ServiceBusClient to create a ServiceBusReceiver instance.
- Variables
fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
entity_path (str) – The path of the entity that the client connects to.
- Parameters
fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
credential (TokenCredential or AzureSasCredential or AzureNamedKeyCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method, or alternatively, an AzureSasCredential can be provided too.
- Keyword Arguments
queue_name (str) – The path of specific Service Bus Queue the client connects to.
topic_name (str) – The path of specific Service Bus Topic which contains the Subscription the client connects to.
subscription_name (str) – The path of specific Service Bus Subscription under the specified Topic the client connects to.
max_wait_time (Optional[float]) – The timeout in seconds between received messages after which the receiver will automatically stop receiving. The default value is None, meaning no timeout.
receive_mode (Union[ServiceBusReceiveMode, str]) – The mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE will be immediately removed from the queue, and cannot be subsequently abandoned or re-received if the client fails to process the message. The default mode is PEEK_LOCK.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.
http_proxy (Dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
user_agent (str) – If specified, this will be added in front of the built-in user agent string.
auto_lock_renewer (Optional[AutoLockRenewer]) – An ~azure.servicebus.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead.
prefetch_count (int) – The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they’re not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, ServiceBusReceiver.receive would try to cache max_message_count (if provided) within its request to the service.
client_identifier (str) – A string-based identifier to uniquely identify the client instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.
socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.
-
abandon_message
(message: azure.servicebus._common.message.ServiceBusReceivedMessage) → None[source]¶ Abandon the message.
This message will be returned to the queue and made available to be received again.
- Parameters
message (ServiceBusReceivedMessage) – The received message to be abandoned.
- Return type
- Raises
~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
- Raises
~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
- Raises
~azure.servicebus.exceptions.ServiceBusError when errors happen.
Example:
Abandon a received message.¶messages = servicebus_receiver.receive_messages(max_wait_time=5) for message in messages: servicebus_receiver.abandon_message(message)
-
close
() → None[source]¶ Close down the handler links (and connection if the handler uses a separate connection).
If the handler has already closed, this operation will do nothing.
- Return type
-
complete_message
(message: azure.servicebus._common.message.ServiceBusReceivedMessage) → None[source]¶ Complete the message.
This removes the message from the queue.
- Parameters
message (ServiceBusReceivedMessage) – The received message to be completed.
- Return type
- Raises
~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
- Raises
~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
- Raises
~azure.servicebus.exceptions.ServiceBusError when errors happen.
Example:
Complete a received message.¶messages = servicebus_receiver.receive_messages(max_wait_time=5) for message in messages: servicebus_receiver.complete_message(message)
-
dead_letter_message
(message: azure.servicebus._common.message.ServiceBusReceivedMessage, reason: Optional[str] = None, error_description: Optional[str] = None) → None[source]¶ Move the message to the Dead Letter queue.
The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue.
- Parameters
message (ServiceBusReceivedMessage) – The received message to be dead-lettered.
reason (Optional[str]) – The reason for dead-lettering the message.
error_description (Optional[str]) – The detailed error description for dead-lettering the message.
- Return type
- Raises
~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
- Raises
~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
- Raises
~azure.servicebus.exceptions.ServiceBusError when errors happen.
Example:
Dead letter a received message.¶messages = servicebus_receiver.receive_messages(max_wait_time=5) for message in messages: servicebus_receiver.dead_letter_message(message)
-
defer_message
(message: azure.servicebus._common.message.ServiceBusReceivedMessage) → None[source]¶ Defers the message.
This message will remain in the queue but must be requested specifically by its sequence number in order to be received.
- Parameters
message (ServiceBusReceivedMessage) – The received message to be deferred.
- Return type
- Raises
~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
- Raises
~azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
- Raises
~azure.servicebus.exceptions.ServiceBusError when errors happen.
Example:
Defer a received message.¶messages = servicebus_receiver.receive_messages(max_wait_time=5) for message in messages: servicebus_receiver.defer_message(message)
-
next
()¶
-
peek_messages
(max_message_count: int = 1, *, sequence_number: int = 0, timeout: Optional[float] = None, **kwargs: Any) → List[azure.servicebus._common.message.ServiceBusReceivedMessage][source]¶ Browse messages currently pending in the queue.
Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered.
- Parameters
max_message_count (int) – The maximum number of messages to try and peek. The default value is 1.
- Keyword Arguments
- Returns
A list of ~azure.servicebus.ServiceBusReceivedMessage.
- Return type
Example:
Look at pending messages in the queue.¶with servicebus_receiver: messages = servicebus_receiver.peek_messages() for message in messages: print(str(message))
-
receive_deferred_messages
(sequence_numbers: Union[int, List[int]], *, timeout: Optional[float] = None, **kwargs: Any) → List[azure.servicebus._common.message.ServiceBusReceivedMessage][source]¶ Receive messages that have previously been deferred.
When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition.
- Parameters
sequence_numbers (Union[int,List[int]]) – A list of the sequence numbers of messages that have been deferred.
- Keyword Arguments
timeout (Optional[float]) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.
- Returns
A list of the requested ~azure.servicebus.ServiceBusReceivedMessage instances.
- Return type
Example:
Receive deferred messages from ServiceBus.¶with servicebus_receiver: deferred_sequenced_numbers = [] messages = servicebus_receiver.receive_messages(max_wait_time=5) for message in messages: deferred_sequenced_numbers.append(message.sequence_number) print(str(message)) servicebus_receiver.defer_message(message) received_deferred_msg = servicebus_receiver.receive_deferred_messages( sequence_numbers=deferred_sequenced_numbers ) for msg in received_deferred_msg: servicebus_receiver.complete_message(msg)
-
receive_messages
(max_message_count: Optional[int] = 1, max_wait_time: Optional[float] = None) → List[azure.servicebus._common.message.ServiceBusReceivedMessage][source]¶ Receive a batch of messages at once.
This approach is optimal if you wish to process multiple messages simultaneously, or perform an ad-hoc receive as a single call.
Note that the number of messages retrieved in a single batch will be dependent on whether prefetch_count was set for the receiver. If prefetch_count is not set for the receiver, the receiver would try to cache max_message_count (if provided) messages within the request to the service.
This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size.
- Parameters
max_message_count (Optional[int]) – Maximum number of messages in the batch. Actual number returned will depend on prefetch_count and incoming stream rate. Setting to None will fully depend on the prefetch config. The default value is 1.
max_wait_time (Optional[float]) – Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, an no messages arrive within the timeout period, an empty list will be returned.
- Returns
A list of messages received. If no messages are available, this will be an empty list.
- Return type
Example:
Receive messages from ServiceBus.¶with servicebus_receiver: messages = servicebus_receiver.receive_messages(max_wait_time=5) for message in messages: print(str(message)) servicebus_receiver.complete_message(message)
-
renew_message_lock
(message: azure.servicebus._common.message.ServiceBusReceivedMessage, *, timeout: Optional[float] = None, **kwargs: Any) → datetime.datetime[source]¶ Renew the message lock.
This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed.
In order to complete (or otherwise settle) the message, the lock must be maintained, and cannot already have expired; an expired lock cannot be renewed.
Messages received via RECEIVE_AND_DELETE mode are not locked, and therefore cannot be renewed. This operation is only available for non-sessionful messages as well.
- Parameters
message (ServiceBusReceivedMessage) – The message to renew the lock for.
- Keyword Arguments
timeout (Optional[float]) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.
- Returns
The utc datetime the lock is set to expire at.
- Return type
- Raises
TypeError if the message is sessionful.
- Raises
~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
- Raises
~azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.
Example:
Renew the lock on a received message.¶messages = servicebus_receiver.receive_messages(max_wait_time=5) for message in messages: servicebus_receiver.renew_message_lock(message)
-
property
client_identifier
¶ Get the ServiceBusReceiver client_identifier associated with the receiver instance.
- Return type
-
property
session
¶ Get the ServiceBusSession object linked with the receiver. Session is only available to session-enabled entities, it would return None if called on a non-sessionful receiver.
- Return type
Example:
Get session from a receiver¶with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session
-
class
azure.servicebus.
ServiceBusSender
(fully_qualified_namespace: str, credential: Union[TokenCredential, AzureSasCredential, AzureNamedKeyCredential], *, queue_name: Optional[str] = None, topic_name: Optional[str] = None, **kwargs: Any)[source]¶ The ServiceBusSender class defines a high level interface for sending messages to the Azure Service Bus Queue or Topic.
Please use the `get_<queue/topic>_sender` method of ~azure.servicebus.ServiceBusClient to create a ServiceBusSender instance.
- Variables
fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
entity_name (str) – The name of the entity that the client connects to.
- Parameters
fully_qualified_namespace (str) – The fully qualified host name for the Service Bus namespace. The namespace format is: <yournamespace>.servicebus.windows.net.
credential (TokenCredential or AzureSasCredential or AzureNamedKeyCredential) – The credential object used for authentication which implements a particular interface for getting tokens. It accepts credential objects generated by the azure-identity library and objects that implement the get_token(self, *scopes) method, or alternatively, an AzureSasCredential can be provided too.
- Keyword Arguments
queue_name (str) – The path of specific Service Bus Queue the client connects to.
topic_name (str) – The path of specific Service Bus Topic the client connects to.
logging_enable (bool) – Whether to output network trace logs to the logger. Default is False.
transport_type (TransportType) – The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.
http_proxy (Dict) – HTTP proxy settings. This must be a dictionary with the following keys: ‘proxy_hostname’ (str value) and ‘proxy_port’ (int value). Additionally the following keys may also be present: ‘username’, ‘password’.
user_agent (str) – If specified, this will be added in front of the built-in user agent string.
client_identifier (str) – A string-based identifier to uniquely identify the client instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.
socket_timeout (float) – The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.
-
cancel_scheduled_messages
(sequence_numbers: Union[int, List[int]], *, timeout: Optional[float] = None, **kwargs: Any) → None[source]¶ Cancel one or more messages that have previously been scheduled and are still pending.
- Parameters
sequence_numbers (int or list[int]) – The sequence numbers of the scheduled messages.
- Keyword Arguments
timeout (float) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.
- Return type
- Raises
~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already cancelled or enqueued.
Example:
Cancelling messages scheduled to be sent in future¶with servicebus_sender: servicebus_sender.cancel_scheduled_messages(sequence_nums)
-
close
() → None¶ Close down the handler links (and connection if the handler uses a separate connection).
If the handler has already closed, this operation will do nothing.
- Return type
-
create_message_batch
(max_size_in_bytes: Optional[int] = None) → azure.servicebus._common.message.ServiceBusMessageBatch[source]¶ Create a ServiceBusMessageBatch object with the max size of all content being constrained by max_size_in_bytes. The max_size should be no greater than the max allowed message size defined by the service.
- Parameters
max_size_in_bytes (Optional[int]) – The maximum size of bytes data that a ServiceBusMessageBatch object can hold. By default, the value is determined by your Service Bus tier.
- Returns
A ServiceBusMessageBatch object
- Return type
Example:
Create ServiceBusMessageBatch object within limited size¶with servicebus_sender: batch_message = servicebus_sender.create_message_batch() batch_message.add_message(ServiceBusMessage("Single message inside batch"))
-
schedule_messages
(messages: MessageTypes, schedule_time_utc: datetime.datetime, *, timeout: Optional[float] = None, **kwargs: Any) → List[int][source]¶ Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages.
- Parameters
messages (Union[ServiceBusMessage, AmqpAnnotatedMessage, List[Union[ServiceBusMessage, AmqpAnnotatedMessage]]]) – The message or list of messages to schedule.
schedule_time_utc (datetime) – The utc date and time to enqueue the messages.
- Keyword Arguments
timeout (float) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.
- Returns
A list of the sequence numbers of the enqueued messages.
- Return type
Example:
Schedule a message to be sent in future¶with servicebus_sender: scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) scheduled_messages = [ServiceBusMessage("Scheduled message") for _ in range(10)] sequence_nums = servicebus_sender.schedule_messages(scheduled_messages, scheduled_time_utc)
-
send_messages
(message: Union[MessageTypes, azure.servicebus._common.message.ServiceBusMessageBatch], *, timeout: Optional[float] = None, **kwargs: Any) → None[source]¶ Sends message and blocks until acknowledgement is received or operation times out.
If a list of messages was provided, attempts to send them as a single batch, throwing a ValueError if they cannot fit in a single batch.
- Parameters
message (Union[ServiceBusMessage, ServiceBusMessageBatch, AmqpAnnotatedMessage, List[Union[ServiceBusMessage, AmqpAnnotatedMessage]]]) – The ServiceBus message to be sent.
- Keyword Arguments
timeout (Optional[float]) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.
- Return type
- Raises
- class
~azure.servicebus.exceptions.OperationTimeoutError if sending times out.
- class
~azure.servicebus.exceptions.MessageSizeExceededError if the size of the message is over service bus frame size limit.
- class
~azure.servicebus.exceptions.ServiceBusError when other errors happen such as connection error, authentication error, and any unexpected errors. It’s also the top-level root class of above errors.
Example:
Send message.¶with servicebus_sender: message = ServiceBusMessage("Hello World") servicebus_sender.send_messages(message)
-
class
azure.servicebus.
ServiceBusSession
(session_id: str, receiver: Union[ServiceBusReceiver, ServiceBusReceiverAsync])[source]¶ The ServiceBusSession is used for manage session states and lock renewal.
Please use the property `session` on the ServiceBusReceiver to get the corresponding ServiceBusSession object linked with the receiver instead of instantiating a ServiceBusSession object directly.
- Variables
auto_renew_error (AutoLockRenewTimeout or AutoLockRenewFailed) – Error when AutoLockRenewer is used and it fails to renew the session lock.
Example:
Get session from a receiver¶with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session
-
get_state
(*, timeout: Optional[float] = None, **kwargs: Any) → bytes[source]¶ Get the session state.
Returns None if no state has been set.
- Keyword Arguments
timeout (float) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.
- Return type
- Returns
The session state.
Example:
Get the session state¶with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session session_state = session.get_state()
-
renew_lock
(*, timeout: Optional[float] = None, **kwargs: Any) → datetime.datetime[source]¶ Renew the session lock.
This operation must be performed periodically in order to retain a lock on the session to continue message processing.
Once the lock is lost the connection will be closed; an expired lock cannot be renewed.
This operation can also be performed as a threaded background task by registering the session with an azure.servicebus.AutoLockRenewer instance.
- Keyword Arguments
timeout (float) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.
- Returns
The utc datetime the lock is set to expire at.
- Return type
Example:
Renew the session lock before it expires¶with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session session.renew_lock()
-
set_state
(state: Optional[Union[str, bytes, bytearray]], *, timeout: Optional[float] = None, **kwargs: Any) → None[source]¶ Set the session state.
- Parameters
state (Union[str, bytes, bytearray, None]) – The state value. Setting state to None will clear the current session.
- Keyword Arguments
timeout (float) – The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.
- Returns
None
- Return type
Example:
Set the session state¶with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver: session = receiver.session session.set_state("START")
-
property
locked_until_utc
¶ The time at which this session’s lock will expire.
- Return type
-
class
azure.servicebus.
ServiceBusSubQueue
(value)[source]¶ An enumeration.
-
DEAD_LETTER
= 'deadletter'¶
-
TRANSFER_DEAD_LETTER
= 'transferdeadletter'¶
-
-
class
azure.servicebus.
TransportType
(value)[source]¶ Transport type The underlying transport protocol type:
Amqp: AMQP over the default TCP transport protocol, it uses port 5671. AmqpOverWebsocket: Amqp over the Web Sockets transport protocol, it uses port 443.
-
Amqp
= 1¶
-
AmqpOverWebsocket
= 2¶
-
-
azure.servicebus.
parse_connection_string
(conn_str)[source]¶ Parse the connection string into a properties bag containing its component parts.
- Parameters
conn_str (str) – The connection string that has to be parsed.
- Returns
A properties model containing the parsed connection string.
- Return type
Subpackages¶
Submodules¶
azure.servicebus.exceptions module¶
-
exception
azure.servicebus.exceptions.
AutoLockRenewFailed
(message, *args, **kwargs)[source]¶ An attempt to renew a lock on a message or session in the background has failed.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
AutoLockRenewTimeout
(message, *args, **kwargs)[source]¶ The time allocated to renew the message or session lock has elapsed.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
MessageAlreadySettled
(**kwargs: Any)[source]¶ Failed to settle the message.
An attempt was made to complete an operation on a message that has already been settled (completed, abandoned, dead-lettered or deferred).
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
MessageLockLostError
(**kwargs: Any)[source]¶ The lock on the message is lost. Callers should call attempt to receive and process the message again.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
MessageNotFoundError
(**kwargs)[source]¶ The requested message was not found.
Attempt to receive a message with a particular sequence number. This message isn’t found. Make sure the message hasn’t been received already. Check the deadletter queue to see if the message has been deadlettered.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
MessageSizeExceededError
(**kwargs)[source]¶ Message content is larger than the service bus frame size.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
MessagingEntityAlreadyExistsError
(**kwargs)[source]¶ An entity with the same name exists under the same namespace.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
MessagingEntityDisabledError
(**kwargs)[source]¶ The Messaging Entity is disabled. Enable the entity again using Portal.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
MessagingEntityNotFoundError
(**kwargs)[source]¶ A Service Bus resource cannot be found by the Service Bus service.
Entity associated with the operation doesn’t exist or it has been deleted. Please make sure the entity exists.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
OperationTimeoutError
(**kwargs)[source]¶ Operation timed out.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
ServiceBusAuthenticationError
(**kwargs)[source]¶ An error occurred when authenticate the connection.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
ServiceBusAuthorizationError
(**kwargs)[source]¶ An error occurred when authorizing the connection.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
ServiceBusCommunicationError
(**kwargs)[source]¶ There was a general communications error encountered when interacting with the Azure Service Bus service.
Client isn’t able to establish a connection to Service Bus. Make sure the supplied host name is correct and the host is reachable. If your code runs in an environment with a firewall/proxy, ensure that the traffic to the Service Bus domain/IP address and ports isn’t blocked.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
ServiceBusConnectionError
(**kwargs)[source]¶ An error occurred in the connection.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
-
exception
azure.servicebus.exceptions.
ServiceBusError
(message, *args, **kwargs)[source]¶ Base exception for all Service Bus errors which can be used for default error handling.
- Parameters
message (str) – The message object stringified as ‘message’ attribute
- Keyword Arguments
error (Exception) – The original exception if any
- Variables
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
exception
azure.servicebus.exceptions.
ServiceBusQuotaExceededError
(**kwargs)[source]¶ The quota applied to a Service Bus resource has been exceeded while interacting with the Azure Service Bus service.
The messaging entity has reached its maximum allowable size, or the maximum number of connections to a namespace has been exceeded. Create space in the entity by receiving messages from the entity or its subqueues.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
exc_traceback
: Optional[TracebackType]¶
-
exc_type
: Optional[Type[Any]]¶
-
exc_value
: Optional[BaseException]¶
-
inner_exception
: Optional[BaseException]¶
-
-
exception
azure.servicebus.exceptions.
ServiceBusServerBusyError
(**kwargs)[source]¶ The Azure Service Bus service reports that it is busy in response to a client request to perform an operation.
Service isn’t able to process the request at this time. Client can wait for a period of time, then retry the operation.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
exc_traceback
: Optional[TracebackType]¶
-
exc_type
: Optional[Type[Any]]¶
-
exc_value
: Optional[BaseException]¶
-
inner_exception
: Optional[BaseException]¶
-
-
exception
azure.servicebus.exceptions.
SessionCannotBeLockedError
(**kwargs)[source]¶ The requested session cannot be locked.
Attempt to connect to a session with a specific session ID, but the session is currently locked by another client. Make sure the session is unlocked by other clients.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
exc_traceback
: Optional[TracebackType]¶
-
exc_type
: Optional[Type[Any]]¶
-
exc_value
: Optional[BaseException]¶
-
inner_exception
: Optional[BaseException]¶
-
-
exception
azure.servicebus.exceptions.
SessionLockLostError
(**kwargs: Any)[source]¶ The lock on the session has expired. Callers should request the session again.
All unsettled messages that have been received can no longer be settled.
-
raise_with_traceback
() → None¶ Raise the exception with the existing traceback.
Deprecated since version 1.22.0: This method is deprecated as we don’t support Python 2 anymore. Use raise/from instead.
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
args
¶
-
exc_traceback
: Optional[TracebackType]¶
-
exc_type
: Optional[Type[Any]]¶
-
exc_value
: Optional[BaseException]¶
-
inner_exception
: Optional[BaseException]¶
-