Source code for azure.servicebus.common.message

# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------

import datetime
import uuid

import uamqp
from uamqp import types

from azure.servicebus.common.constants import (
    DEADLETTERNAME,
    RECEIVER_LINK_DEAD_LETTER_REASON,
    RECEIVER_LINK_DEAD_LETTER_DESCRIPTION
)
from azure.servicebus.common.errors import (
    MessageAlreadySettled,
    MessageSettleFailed,
    MessageLockExpired,
    SessionLockExpired)


[docs]class Message(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes """A Service Bus Message. :param body: The data to send in a single message. The maximum size per message is 256 kB. :type body: str or bytes :param encoding: The encoding for string data. Default is UTF-8. :type encoding: str .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/test_examples.py :start-after: [START send_complex_message] :end-before: [END send_complex_message] :language: python :dedent: 4 :caption: Sending a message with additional properties .. literalinclude:: ../samples/sync_samples/test_examples.py :start-after: [START receive_complex_message] :end-before: [END receive_complex_message] :language: python :dedent: 4 :caption: Checking the properties on a received message """ _X_OPT_ENQUEUED_TIME = b'x-opt-enqueued-time' _X_OPT_SEQUENCE_NUMBER = b'x-opt-sequence-number' _X_OPT_ENQUEUE_SEQUENCE_NUMBER = b'x-opt-enqueue-sequence-number' _X_OPT_PARTITION_ID = b'x-opt-partition-id' _X_OPT_PARTITION_KEY = b'x-opt-partition-key' _X_OPT_VIA_PARTITION_KEY = b'x-opt-via-partition-key' _X_OPT_LOCKED_UNTIL = b'x-opt-locked-until' _x_OPT_LOCK_TOKEN = b'x-opt-lock-token' _x_OPT_SCHEDULED_ENQUEUE_TIME = b'x-opt-scheduled-enqueue-time' def __init__(self, body, encoding='UTF-8', **kwargs): subject = kwargs.pop('subject', None) # Although we might normally thread through **kwargs this causes problems as MessageProperties won't absorb spurious args. self.properties = uamqp.message.MessageProperties(encoding=encoding, subject=subject) self.header = uamqp.message.MessageHeader() self.received_timestamp = None self.auto_renew_error = None self._annotations = {} self._app_properties = {} self._encoding = encoding self._expiry = None self._receiver = None if 'message' in kwargs: self.message = kwargs['message'] self._annotations = self.message.annotations self._app_properties = self.message.application_properties self.properties = self.message.properties self.header = self.message.header self.received_timestamp = datetime.datetime.now() else: self._build_message(body) def __str__(self): return str(self.message) def _build_message(self, body): if isinstance(body, list) and body: # TODO: This only works for a list of bytes/strings self.message = uamqp.Message(body[0], properties=self.properties, header=self.header) for more in body[1:]: self.message._body.append(more) # pylint: disable=protected-access elif body is None: raise ValueError("Message body cannot be None.") else: self.message = uamqp.Message(body, properties=self.properties, header=self.header) def _is_live(self, action): # pylint: disable=no-member if self.settled: raise MessageAlreadySettled(action) try: if self.expired: raise MessageLockExpired(inner_exception=self.auto_renew_error) except TypeError: pass if hasattr(self._receiver, 'expired') and self._receiver.expired: raise SessionLockExpired(inner_exception=self._receiver.auto_renew_error) @property def settled(self): """Whether the message has been settled. This will aways be `True` for a message received using ReceiveAndDelete mode, otherwise it will be `False` until the message is completed or otherwise settled. :rtype: bool """ return self.message.settled @property def annotations(self): """The annotations of the message. :rtype: dict """ return self.message.annotations @annotations.setter def annotations(self, value): """Set the annotations on the message. :param value: The annotations for the Message. :type value: dict """ self.message.annotations = value @property def user_properties(self): """User defined properties on the message. :rtype: dict """ return self.message.application_properties @user_properties.setter def user_properties(self, value): """User defined properties on the message. :param value: The application properties for the Message. :type value: dict """ self.message.application_properties = value @property def enqueued_time(self): if self.message.annotations: timestamp = self.message.annotations.get(self._X_OPT_ENQUEUED_TIME) if timestamp: in_seconds = timestamp/1000.0 return datetime.datetime.utcfromtimestamp(in_seconds) return None @property def scheduled_enqueue_time(self): if self.message.annotations: timestamp = self.message.annotations.get(self._x_OPT_SCHEDULED_ENQUEUE_TIME) if timestamp: in_seconds = timestamp/1000.0 return datetime.datetime.utcfromtimestamp(in_seconds) return None @property def sequence_number(self): if self.message.annotations: return self.message.annotations.get(self._X_OPT_SEQUENCE_NUMBER) return None @property def enqueue_sequence_number(self): if self.message.annotations: return self.message.annotations.get(self._X_OPT_ENQUEUE_SEQUENCE_NUMBER) return None @enqueue_sequence_number.setter def enqueue_sequence_number(self, value): if not self.message.annotations: self.message.annotations = {} self.message.annotations[types.AMQPSymbol(self._X_OPT_ENQUEUE_SEQUENCE_NUMBER)] = value @property def partition_id(self): if self.message.annotations: return self.message.annotations.get(self._X_OPT_PARTITION_ID) return None @property def partition_key(self): if self.message.annotations: return self.message.annotations.get(self._X_OPT_PARTITION_KEY) return None @partition_key.setter def partition_key(self, value): if not self.message.annotations: self.message.annotations = {} self.message.annotations[types.AMQPSymbol(self._X_OPT_PARTITION_KEY)] = value @property def via_partition_key(self): if self.message.annotations: return self.message.annotations.get(self._X_OPT_VIA_PARTITION_KEY) return None @via_partition_key.setter def via_partition_key(self, value): if not self.message.annotations: self.message.annotations = {} self.message.annotations[types.AMQPSymbol(self._X_OPT_VIA_PARTITION_KEY)] = value @property def locked_until(self): if hasattr(self._receiver, 'locked_until') or self.settled: return None if self._expiry: return self._expiry if self.message.annotations and self._X_OPT_LOCKED_UNTIL in self.message.annotations: expiry_in_seconds = self.message.annotations[self._X_OPT_LOCKED_UNTIL]/1000 self._expiry = datetime.datetime.fromtimestamp(expiry_in_seconds) return self._expiry @property def expired(self): if hasattr(self._receiver, 'locked_until'): raise TypeError("Session messages do not expire. Please use the Session expiry instead.") if self.locked_until and self.locked_until <= datetime.datetime.now(): return True return False @property def lock_token(self): if hasattr(self._receiver, 'locked_until') or self.settled: return None if hasattr(self.message, 'delivery_tag') and self.message.delivery_tag: return uuid.UUID(bytes_le=self.message.delivery_tag) delivery_annotations = self.message.delivery_annotations if delivery_annotations: return delivery_annotations.get(self._x_OPT_LOCK_TOKEN) return None @property def session_id(self): try: return self.properties.group_id.decode('UTF-8') except (AttributeError, UnicodeDecodeError): return self.properties.group_id @session_id.setter def session_id(self, value): self.properties.group_id = value @property def time_to_live(self): if self.header and self.header.time_to_live: return datetime.timedelta(milliseconds=self.header.time_to_live) return None @time_to_live.setter def time_to_live(self, value): if not self.header: self.header = uamqp.message.MessageHeader() if isinstance(value, datetime.timedelta): self.header.time_to_live = value.seconds * 1000 else: self.header.time_to_live = int(value) * 1000 @property def body(self): """The body of the Message. :rtype: bytes or generator[bytes] """ return self.message.get_data()
[docs] def schedule(self, schedule_time): """Add a specific enqueue time to the message. :param schedule_time: The scheduled time to enqueue the message. :type schedule_time: ~datetime.datetime """ if not self.properties.message_id: self.properties.message_id = str(uuid.uuid4()) if not self.message.annotations: self.message.annotations = {} self.message.annotations[types.AMQPSymbol(self._x_OPT_SCHEDULED_ENQUEUE_TIME)] = schedule_time
[docs] def renew_lock(self): """Renew the message lock. This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed. In order to complete (or otherwise settle) the message, the lock must be maintained. Messages received via ReceiveAndDelete mode are not locked, and therefore cannot be renewed. This operation can also be performed as a threaded background task by registering the message with an `azure.servicebus.AutoLockRenew` instance. This operation is only available for non-sessionful messages. :raises: TypeError if the message is sessionful. :raises: ~azure.servicebus.common.errors.MessageLockExpired is message lock has already expired. :raises: ~azure.servicebus.common.errors.MessageAlreadySettled is message has already been settled. """ if hasattr(self._receiver, 'locked_until'): raise TypeError("Session messages cannot be renewed. Please renew the Session lock instead.") self._is_live('renew') token = self.lock_token if not token: raise ValueError("Unable to renew lock - no lock token found.") expiry = self._receiver._renew_locks(token) # pylint: disable=protected-access self._expiry = datetime.datetime.fromtimestamp(expiry[b'expirations'][0]/1000.0)
[docs] def complete(self): """Complete the message. This removes the message from the queue. :raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ self._is_live('complete') try: self.message.accept() except Exception as e: raise MessageSettleFailed("complete", e)
[docs] def dead_letter(self, description=None, reason=None): """Move the message to the Dead Letter queue. The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue. To receive dead-lettered messages, use `QueueClient.get_deadletter_receiver()` or `SubscriptionClient.get_deadletter_receiver()`. :param str description: The error description for dead-lettering the message. :param str reason: The reason for dead-lettering the message. If `reason` is not set while `description` is set, then `reason` would be set the same as `description`. :raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ self._is_live('reject') info = None if description: info = { RECEIVER_LINK_DEAD_LETTER_REASON: reason or description, RECEIVER_LINK_DEAD_LETTER_DESCRIPTION: description } elif reason: info = { RECEIVER_LINK_DEAD_LETTER_REASON: reason } try: self.message.reject(condition=DEADLETTERNAME, description=description, info=info) except Exception as e: raise MessageSettleFailed("reject", e)
[docs] def abandon(self): """Abandon the message. This message will be returned to the queue to be reprocessed. :raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ self._is_live('abandon') try: self.message.modify(True, False) except Exception as e: raise MessageSettleFailed("abandon", e)
[docs] def defer(self): """Defer the message. This message will remain in the queue but must be received specifically by its sequence number in order to be processed. :raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ self._is_live('defer') try: self.message.modify(True, True) except Exception as e: raise MessageSettleFailed("defer", e)
[docs]class BatchMessage(Message): """A batch of messages combined into a single message body. The body of the messages in the batch should be supplied by an iterable, such as a generator. If the contents of the iterable exceeds the maximum size of a single message (256 kB), the data will be broken up across multiple messages. :param body: The data to send in each message in the batch. The maximum size per message is 256 kB. If data is supplied in excess of this limit, multiple messages will be sent. :type body: Iterable :param encoding: The encoding for string data. Default is UTF-8. :type encoding: str .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/test_examples.py :start-after: [START send_batch_message] :end-before: [END send_batch_message] :language: python :dedent: 4 :caption: Send a batched message. """ def _build_message(self, body): if body is None: raise ValueError("Message body cannot be None.") else: self.message = uamqp.BatchMessage( data=body, multi_messages=True, properties=self.properties, header=self.header)
[docs]class PeekMessage(Message): """A preview message. This message is still on the queue, and unlocked. A peeked message cannot be completed, abandoned, dead-lettered or deferred. It has no lock token or expiry. """ def __init__(self, message): super(PeekMessage, self).__init__(None, message=message) @property def locked_until(self): raise TypeError("Peeked message is not locked.") @property def lock_token(self): raise TypeError("Peeked message is not locked.")
[docs] def renew_lock(self): """A PeekMessage cannot be renewed. Raises `TypeError`.""" raise TypeError("Peeked message is not locked.")
[docs] def complete(self): """A PeekMessage cannot be completed Raises `TypeError`.""" raise TypeError("Peeked message cannot be completed.")
[docs] def dead_letter(self, description=None, reason=None): """A PeekMessage cannot be dead-lettered. Raises `TypeError`.""" raise TypeError("Peeked message cannot be dead-lettered.")
[docs] def abandon(self): """A PeekMessage cannot be abandoned. Raises `TypeError`.""" raise TypeError("Peeked message cannot be abandoned.")
[docs] def defer(self): """A PeekMessage cannot be deferred. Raises `TypeError`.""" raise TypeError("Peeked message cannot be deferred.")
[docs]class DeferredMessage(Message): """A message that has been deferred. A deferred message can be completed, abandoned, or dead-lettered, however it cannot be deferred again. """ def __init__(self, message, mode): self._settled = mode == 0 super(DeferredMessage, self).__init__(None, message=message) def _is_live(self, action): if not self._receiver: raise ValueError("Orphan message had no open connection.") super(DeferredMessage, self)._is_live(action) @property def lock_token(self): if self.settled: return None delivery_annotations = self.message.delivery_annotations if delivery_annotations: return delivery_annotations.get(self._x_OPT_LOCK_TOKEN) return None @property def settled(self): return self._settled
[docs] def complete(self): """Complete the message. This removes the message from the queue. :raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ self._is_live('complete') self._receiver._settle_deferred('completed', [self.lock_token]) # pylint: disable=protected-access self._settled = True
[docs] def dead_letter(self, description=None, reason=None): """Move the message to the Dead Letter queue. The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue. To receive dead-lettered messages, use `QueueClient.get_deadletter_receiver()` or `SubscriptionClient.get_deadletter_receiver()`. :param str description: The error description for dead-lettering the message. :param str reason: The reason for dead-lettering the message. If `reason` is not set while `description` is set, then `reason` would be set the same as `description`. :raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ self._is_live('dead-letter') details = { 'deadletter-reason': str(reason) if reason else (str(description) if description else ""), 'deadletter-description': str(description) if description else ""} self._receiver._settle_deferred( # pylint: disable=protected-access 'suspended', [self.lock_token], dead_letter_details=details) self._settled = True
[docs] def abandon(self): """Abandon the message. This message will be returned to the queue to be reprocessed. :raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ self._is_live('abandon') self._receiver._settle_deferred('abandoned', [self.lock_token]) # pylint: disable=protected-access self._settled = True
[docs] def defer(self): """A DeferredMessage cannot be deferred. Raises `ValueError`.""" raise ValueError("Message is already deferred.")