# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
import logging
from uamqp import SendClient
from uamqp import authentication
from uamqp import constants, types
from azure.servicebus.base_handler import BaseHandler
from azure.servicebus.common.errors import MessageSendFailed
from azure.servicebus.common import mgmt_handlers, mixins
from azure.servicebus.common.message import Message
from azure.servicebus.common.constants import (
REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION,
REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION)
_log = logging.getLogger(__name__)
[docs]class Sender(BaseHandler, mixins.SenderMixin):
"""A message sender.
This handler is for sending messages to a Service Bus entity.
It operates a single connection that must be opened and closed on completion.
The Sender can be run within a context manager to ensure that the connection is closed on exit.
The Sender should not be instantiated directly, and should be accessed from a `QueueClient` or
`TopicClient` using the `get_sender()` method.
.. note:: This object is not thread-safe.
:param handler_id: The ID used as the connection name for the Sender.
:type handler_id: str
:param target: The endpoint to send messages to.
:type target: ~uamqp.Target
:param auth_config: The SASL auth credentials.
:type auth_config: dict[str, str]
:param session: An optional session ID. If supplied, all outgoing messages will have this
session ID added (unless they already have one specified).
:type session: str
:param connection: A shared connection [not yet supported].
:type connection: ~uamqp.Connection
:param encoding: The encoding used for string properties. Default is 'UTF-8'.
:type encoding: str
:param debug: Whether to enable network trace debug logs.
:type debug: bool
Example:
.. literalinclude:: ../examples/test_examples.py
:start-after: [START create_sender_client]
:end-before: [END create_sender_client]
:language: python
:dedent: 4
:caption: Create a new instance of the Sender
"""
def __init__(
self, handler_id, target, auth_config, session=None,
connection=None, encoding='UTF-8', debug=False, **kwargs):
self.name = "SBSender-{}".format(handler_id)
self.session_id = session
super(Sender, self).__init__(
target, auth_config, connection=connection, encoding=encoding, debug=debug, **kwargs)
def _build_handler(self):
auth = None if self.connection else authentication.SASTokenAuth.from_shared_access_key(**self.auth_config)
self._handler = SendClient(
self.endpoint,
auth=auth,
debug=self.debug,
properties=self.properties,
client_name=self.name,
error_policy=self.error_policy,
encoding=self.encoding,
**self.handler_kwargs)
[docs] def send(self, message):
"""Send a message and blocks until acknowledgement is received or the operation fails.
:param message: The message to be sent.
:type message: ~azure.servicebus.common.message.Message
:raises: ~azure.servicebus.common.errors.MessageSendFailed if the message fails to send.
Example:
.. literalinclude:: ../examples/test_examples.py
:start-after: [START send_message]
:end-before: [END send_message]
:language: python
:dedent: 4
:caption: Send a message and block
"""
if not isinstance(message, Message):
raise TypeError("Value of message must be of type 'Message'.")
if not self.running:
self.open()
if self.session_id and not message.properties.group_id:
message.properties.group_id = self.session_id
try:
self._handler.send_message(message.message)
except Exception as e:
raise MessageSendFailed(e)
[docs] def schedule(self, schedule_time, *messages):
"""Send one or more messages to be enqueued at a specific time.
Returns a list of the sequence numbers of the enqueued messages.
:param schedule_time: The date and time to enqueue the messages.
:type schedule_time: ~datetime.datetime
:param messages: The messages to schedule.
:type messages: ~azure.servicebus.common.message.Message
:rtype: list[int]
Example:
.. literalinclude:: ../examples/test_examples.py
:start-after: [START scheduling_messages]
:end-before: [END scheduling_messages]
:language: python
:dedent: 4
:caption: Schedule a message to be sent in future
"""
if not self.running:
self.open()
request_body = self._build_schedule_request(schedule_time, *messages)
return self._mgmt_request_response(
REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION,
request_body,
mgmt_handlers.schedule_op)
[docs] def cancel_scheduled_messages(self, *sequence_numbers):
"""Cancel one or more messages that have previsouly been scheduled and are still pending.
:param sequence_numbers: The seqeuence numbers of the scheduled messages.
:type sequence_numbers: int
Example:
.. literalinclude:: ../examples/test_examples.py
:start-after: [START cancel_scheduled_messages]
:end-before: [END cancel_scheduled_messages]
:language: python
:dedent: 4
:caption: Cancelling messages scheduled to be sent in future
"""
if not self.running:
self.open()
numbers = [types.AMQPLong(s) for s in sequence_numbers]
request_body = {'sequence-numbers': types.AMQPArray(numbers)}
return self._mgmt_request_response(
REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION,
request_body,
mgmt_handlers.default)
[docs] def send_pending_messages(self):
"""Wait until all transferred events have been sent.
:returns: A list of the send results of all the pending messages. Each
send result is a tuple with two values. The first is a boolean, indicating `True`
if the message sent, or `False` if it failed. The second is an error if the message
failed, otherwise it will be `None`.
:rtype: list[tuple[bool, ~azure.servicebus.common.errors.MessageSendFailed]]
Example:
.. literalinclude:: ../examples/test_examples.py
:start-after: [START queue_and_send_messages]
:end-before: [END queue_and_send_messages]
:language: python
:dedent: 4
:caption: Send the queued messages
"""
if not self.running:
self.open()
try:
pending = self._handler._pending_messages[:] # pylint: disable=protected-access
_log.debug("Sending %r pending messages", len(pending))
self._handler.wait()
results = []
for m in pending:
if m.state == constants.MessageState.SendFailed:
results.append((False, MessageSendFailed(m._response))) # pylint: disable=protected-access
else:
results.append((True, None))
return results
except Exception as e:
raise MessageSendFailed(e)
[docs] def reconnect(self):
"""Reconnect the handler.
If the handler was disconnected from the service with
a retryable error - attempt to reconnect.
This method will be called automatically for most retryable errors.
Also attempts to re-queue any messages that were pending before the reconnect.
"""
unsent_events = self._handler.pending_messages
super(Sender, self).reconnect()
try:
self._handler.queue_message(*unsent_events)
self._handler.wait()
except Exception as e: # pylint: disable=broad-except
self._handle_exception(e)
[docs]class SessionSender(Sender):
"""A session message sender.
This handler is for sending messages to a sessionful Service Bus entity.
It operates a single connection that must be opened and closed on completion.
The Sender can be run within a context manager to ensure that the connection is closed on exit.
The Sender should not be instantiated directly, and should be accessed from a `QueueClient` or
`TopicClient` using the `get_sender()` method.
An attempt to send a message without a session ID specified either on the Sender or the message
will raise a `ValueError`.
.. note:: This object is not thread-safe.
:param handler_id: The ID used as the connection name for the Sender.
:type handler_id: str
:param target: The endpoint to send messages to.
:type target: ~uamqp.Target
:param auth_config: The SASL auth credentials.
:type auth_config: dict[str, str]
:param session: An optional session ID. If supplied, all outgoing messages will have this
session ID added (unless they already have one specified).
:type session: str
:param connection: A shared connection [not yet supported].
:type connection: ~uamqp.Connection
:param encoding: The encoding used for string properties. Default is 'UTF-8'.
:type encoding: str
:param debug: Whether to enable network trace debug logs.
:type debug: bool
"""
[docs] def send(self, message):
"""Send a message and blocks until acknowledgement is received or the operation fails.
If neither the Sender nor the message has a session ID, a `ValueError` will be raised.
:param message: The message to be sent.
:type message: ~azure.servicebus.common.message.Message
:raises: ~azure.servicebus.common.errors.MessageSendFailed if the message fails to
send.
:raises: ValueError if there is no session ID specified.
Example:
.. literalinclude:: ../examples/test_examples.py
:start-after: [START send_message]
:end-before: [END send_message]
:language: python
:dedent: 4
:caption: Send a message and block
"""
if not isinstance(message, Message):
raise TypeError("Value of message must be of type 'Message'.")
if not self.session_id and not message.properties.group_id:
raise ValueError("Message must have Session ID.")
super(SessionSender, self).send(message)
[docs] def queue_message(self, message):
"""Queue a message to be sent later.
This operation should be followed up with send_pending_messages. If neither the Sender nor the message
has a session ID, a `ValueError` will be raised.
:param message: The message to be sent.
:type message: ~azure.servicebus.Message
Example:
.. literalinclude:: ../examples/test_examples.py
:start-after: [START queue_and_send_session_messages]
:end-before: [END queue_and_send_session_messages]
:language: python
:dedent: 4
:caption: Put the message to be sent later in the queue
"""
if not self.session_id and not message.properties.group_id:
raise ValueError("Message must have Session ID.")
super(SessionSender, self).queue_message(message)
[docs] def schedule(self, schedule_time, *messages):
"""Send one or more messages to be enqueued at a specific time.
Returns a list of the sequence numbers of the enqueued messages.
:param schedule_time: The date and time to enqueue the messages.
:type schedule_time: ~datetime.datetime
:param messages: The messages to schedule.
:type messages: ~azure.servicebus.common.message.Message
:rtype: list[int]
Example:
.. literalinclude:: ../examples/test_examples.py
:start-after: [START scheduling_messages]
:end-before: [END scheduling_messages]
:language: python
:dedent: 4
:caption: Schedule a message to be sent in future
"""
for message in messages:
if not self.session_id and not message.properties.group_id:
raise ValueError("Message must have Session ID.")
return super(SessionSender, self).schedule(schedule_time, *messages)