Source code for azure.servicebus._common.message

# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------

import datetime
import uuid
import functools
import logging
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable, Dict, Any

import uamqp.message
from uamqp import types

from .constants import (
    _BATCH_MESSAGE_OVERHEAD_COST,
    SETTLEMENT_ABANDON,
    SETTLEMENT_COMPLETE,
    SETTLEMENT_DEFER,
    SETTLEMENT_DEADLETTER,
    ReceiveSettleMode,
    _X_OPT_ENQUEUED_TIME,
    _X_OPT_SEQUENCE_NUMBER,
    _X_OPT_ENQUEUE_SEQUENCE_NUMBER,
    _X_OPT_PARTITION_ID,
    _X_OPT_PARTITION_KEY,
    _X_OPT_VIA_PARTITION_KEY,
    _X_OPT_LOCKED_UNTIL,
    _X_OPT_LOCK_TOKEN,
    _X_OPT_SCHEDULED_ENQUEUE_TIME,
    MGMT_RESPONSE_MESSAGE_EXPIRATION,
    MGMT_REQUEST_DEAD_LETTER_REASON,
    MGMT_REQUEST_DEAD_LETTER_DESCRIPTION,
    RECEIVER_LINK_DEAD_LETTER_REASON,
    RECEIVER_LINK_DEAD_LETTER_DESCRIPTION,
    MESSAGE_COMPLETE,
    MESSAGE_DEAD_LETTER,
    MESSAGE_ABANDON,
    MESSAGE_DEFER,
    MESSAGE_RENEW_LOCK,
    DEADLETTERNAME
)
from ..exceptions import (
    MessageAlreadySettled,
    MessageLockExpired,
    SessionLockExpired,
    MessageSettleFailed,
    MessageContentTooLarge)
from .utils import utc_from_timestamp, utc_now
if TYPE_CHECKING:
    from .._servicebus_receiver import ServiceBusReceiver
    from .._servicebus_session_receiver import ServiceBusSessionReceiver

_LOGGER = logging.getLogger(__name__)


[docs]class Message(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes """A Service Bus Message. :ivar properties: Properties of the internal AMQP message object. :vartype properties: ~uamqp.message.MessageProperties :ivar header: Header of the internal AMQP message object. :vartype header: ~uamqp.message.MessageHeader :ivar message: Internal AMQP message object. :vartype message: ~uamqp.message.Message :param body: The data to send in a single message. :type body: str or bytes :keyword str encoding: The encoding for string data. Default is UTF-8. :keyword str session_id: An optional session ID for the message to be sent. .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py :start-after: [START send_complex_message] :end-before: [END send_complex_message] :language: python :dedent: 4 :caption: Sending a message with additional properties """ def __init__(self, body, **kwargs): subject = kwargs.pop('subject', None) # Although we might normally thread through **kwargs this causes # problems as MessageProperties won't absorb spurious args. self._encoding = kwargs.pop("encoding", 'UTF-8') self.properties = uamqp.message.MessageProperties(encoding=self._encoding, subject=subject) self.header = uamqp.message.MessageHeader() self._annotations = {} self._app_properties = {} self.session_id = kwargs.get("session_id", None) if 'message' in kwargs: self.message = kwargs['message'] self._annotations = self.message.annotations self._app_properties = self.message.application_properties self.properties = self.message.properties self.header = self.message.header else: self._build_message(body) def __str__(self): return str(self.message) def _build_message(self, body): if isinstance(body, list) and body: # TODO: This only works for a list of bytes/strings self.message = uamqp.Message(body[0], properties=self.properties, header=self.header) for more in body[1:]: self.message._body.append(more) # pylint: disable=protected-access elif body is None: raise ValueError("Message body cannot be None.") else: self.message = uamqp.Message(body, properties=self.properties, header=self.header) @property def session_id(self): # type: () -> str """The session id of the message :rtype: str """ try: return self.properties.group_id.decode('UTF-8') except (AttributeError, UnicodeDecodeError): return self.properties.group_id @session_id.setter def session_id(self, value): """Set the session id on the message. :param value: The session id for the message. :type value: str """ self.properties.group_id = value @property def annotations(self): # type: () -> dict """The annotations of the message. :rtype: dict """ return self.message.annotations @annotations.setter def annotations(self, value): """Set the annotations on the message. :param value: The annotations for the Message. :type value: dict """ self.message.annotations = value @property def user_properties(self): # type: () -> dict """User defined properties on the message. :rtype: dict """ return self.message.application_properties @user_properties.setter def user_properties(self, value): """User defined properties on the message. :param value: The application properties for the Message. :type value: dict """ self.message.application_properties = value @property def enqueue_sequence_number(self): # type: () -> Optional[int] """ :rtype: int """ if self.message.annotations: return self.message.annotations.get(_X_OPT_ENQUEUE_SEQUENCE_NUMBER) return None @enqueue_sequence_number.setter def enqueue_sequence_number(self, value): if not self.message.annotations: self.message.annotations = {} self.message.annotations[types.AMQPSymbol(_X_OPT_ENQUEUE_SEQUENCE_NUMBER)] = value @property def partition_key(self): # type: () -> Optional[str] """ :rtype: str """ if self.message.annotations: return self.message.annotations.get(_X_OPT_PARTITION_KEY) return None @partition_key.setter def partition_key(self, value): if not self.message.annotations: self.message.annotations = {} self.message.annotations[types.AMQPSymbol(_X_OPT_PARTITION_KEY)] = value @property def via_partition_key(self): # type: () -> Optional[str] """ :rtype: str """ if self.message.annotations: return self.message.annotations.get(_X_OPT_VIA_PARTITION_KEY) return None @via_partition_key.setter def via_partition_key(self, value): if not self.message.annotations: self.message.annotations = {} self.message.annotations[types.AMQPSymbol(_X_OPT_VIA_PARTITION_KEY)] = value @property def time_to_live(self): # type: () -> Optional[datetime.timedelta] """ :rtype: ~datetime.timedelta """ if self.header and self.header.time_to_live: return datetime.timedelta(milliseconds=self.header.time_to_live) return None @time_to_live.setter def time_to_live(self, value): if not self.header: self.header = uamqp.message.MessageHeader() if isinstance(value, datetime.timedelta): self.header.time_to_live = value.seconds * 1000 else: self.header.time_to_live = int(value) * 1000 @property def scheduled_enqueue_time_utc(self): # type: () -> Optional[datetime.datetime] """Get or set the utc scheduled enqueue time to the message. This property can be used for scheduling when sending a message through `ServiceBusSender.send` method. If cancelling scheduled messages is required, you should use the `ServiceBusSender.schedule` method, which returns sequence numbers that can be used for future cancellation. `scheduled_enqueue_time_utc` is None if not set. :rtype: ~datetime.datetime """ if self.message.annotations: timestamp = self.message.annotations.get(_X_OPT_SCHEDULED_ENQUEUE_TIME) if timestamp: in_seconds = timestamp/1000.0 return utc_from_timestamp(in_seconds) return None @scheduled_enqueue_time_utc.setter def scheduled_enqueue_time_utc(self, value): # type: (datetime.datetime) -> None if not self.properties.message_id: self.properties.message_id = str(uuid.uuid4()) if not self.message.annotations: self.message.annotations = {} self.message.annotations[types.AMQPSymbol(_X_OPT_SCHEDULED_ENQUEUE_TIME)] = value @property def body(self): # type: () -> Union[bytes, Iterable[bytes]] """The body of the Message. :rtype: bytes or Iterable[bytes] """ return self.message.get_data()
[docs]class BatchMessage(object): """A batch of messages. Sending messages in a batch is more performant than sending individual message. BatchMessage helps you create the maximum allowed size batch of `Message` to improve sending performance. Use the `add` method to add messages until the maximum batch size limit in bytes has been reached - at which point a `ValueError` will be raised. **Please use the create_batch method of ServiceBusSender to create a BatchMessage object instead of instantiating a BatchMessage object directly.** :ivar max_size_in_bytes: The maximum size of bytes data that a BatchMessage object can hold. :vartype max_size_in_bytes: int :ivar message: Internal AMQP BatchMessage object. :vartype message: ~uamqp.BatchMessage :param int max_size_in_bytes: The maximum size of bytes data that a BatchMessage object can hold. """ def __init__(self, max_size_in_bytes=None): # type: (Optional[int]) -> None self.max_size_in_bytes = max_size_in_bytes or uamqp.constants.MAX_MESSAGE_LENGTH_BYTES self.message = uamqp.BatchMessage(data=[], multi_messages=False, properties=None) self._size = self.message.gather()[0].get_message_encoded_size() self._count = 0 self._messages = [] # type: List[Message] def __repr__(self): # type: () -> str batch_repr = "max_size_in_bytes={}, message_count={}".format( self.max_size_in_bytes, self._count ) return "BatchMessage({})".format(batch_repr) def __len__(self): return self._count def _from_list(self, messages): for each in messages: if not isinstance(each, Message): raise ValueError("Populating a message batch only supports iterables containing Message Objects. " "Received instead: {}".format(each.__class__.__name__)) self.add(each) @property def size_in_bytes(self): # type: () -> int """The combined size of the events in the batch, in bytes. :rtype: int """ return self._size
[docs] def add(self, message): # type: (Message) -> None """Try to add a single Message to the batch. The total size of an added message is the sum of its body, properties, etc. If this added size results in the batch exceeding the maximum batch size, a `ValueError` will be raised. :param message: The Message to be added to the batch. :type message: ~azure.servicebus.Message :rtype: None :raises: :class: ~azure.servicebus.exceptions.MessageContentTooLarge, when exceeding the size limit. """ message_size = message.message.get_message_encoded_size() # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that # message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes. size_after_add = ( self._size + message_size + _BATCH_MESSAGE_OVERHEAD_COST[0 if (message_size < 256) else 1] ) if size_after_add > self.max_size_in_bytes: raise MessageContentTooLarge( "BatchMessage has reached its size limit: {}".format( self.max_size_in_bytes ) ) self.message._body_gen.append(message) # pylint: disable=protected-access self._size = size_after_add self._count += 1 self._messages.append(message)
[docs]class PeekMessage(Message): """A preview message. This message is still on the queue, and unlocked. A peeked message cannot be completed, abandoned, dead-lettered or deferred. It has no lock token or expiry. :ivar received_timestamp_utc: The utc timestamp of when the message is received. :vartype received_timestamp_utc: datetime.datetime """ def __init__(self, message): super(PeekMessage, self).__init__(None, message=message) self.received_timestamp_utc = utc_now() @property def settled(self): # type: () -> bool """Whether the message has been settled. This will aways be `True` for a message received using ReceiveAndDelete mode, otherwise it will be `False` until the message is completed or otherwise settled. :rtype: bool """ return self.message.settled @property def partition_id(self): # type: () -> Optional[str] """ :rtype: int """ if self.message.annotations: return self.message.annotations.get(_X_OPT_PARTITION_ID) return None @property def enqueued_time_utc(self): # type: () -> Optional[datetime.datetime] """ :rtype: ~datetime.datetime """ if self.message.annotations: timestamp = self.message.annotations.get(_X_OPT_ENQUEUED_TIME) if timestamp: in_seconds = timestamp/1000.0 return utc_from_timestamp(in_seconds) return None @property def sequence_number(self): # type: () -> Optional[int] """ :rtype: int """ if self.message.annotations: return self.message.annotations.get(_X_OPT_SEQUENCE_NUMBER) return None
[docs]class ReceivedMessage(PeekMessage): """ A Service Bus Message received from service side. :ivar auto_renew_error: Error when AutoLockRenew is used and it fails to renew the message lock. :vartype auto_renew_error: ~azure.servicebus.AutoLockRenewTimeout or ~azure.servicebus.AutoLockRenewFailed .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py :start-after: [START receive_complex_message] :end-before: [END receive_complex_message] :language: python :dedent: 4 :caption: Checking the properties on a received message. """ def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs): super(ReceivedMessage, self).__init__(message=message) self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete) self._is_deferred_message = kwargs.get("is_deferred_message", False) self.auto_renew_error = None self._receiver = None # type: ignore self._expiry = None @property def settled(self): # type: () -> bool """Whether the message has been settled. This will aways be `True` for a message received using ReceiveAndDelete mode, otherwise it will be `False` until the message is completed or otherwise settled. :rtype: bool """ return self._settled @property def expired(self): # type: () -> bool """ :rtype: bool """ try: if self._receiver.session: # pylint: disable=protected-access raise TypeError("Session messages do not expire. Please use the Session expiry instead.") except AttributeError: # Is not a session receiver pass if self.locked_until_utc and self.locked_until_utc <= utc_now(): return True return False @property def locked_until_utc(self): # type: () -> Optional[datetime.datetime] """ :rtype: datetime.datetime """ try: if self.settled or self._receiver.session: # pylint: disable=protected-access return None except AttributeError: # not settled, and isn't session receiver. pass if self._expiry: return self._expiry if self.message.annotations and _X_OPT_LOCKED_UNTIL in self.message.annotations: expiry_in_seconds = self.message.annotations[_X_OPT_LOCKED_UNTIL]/1000 self._expiry = utc_from_timestamp(expiry_in_seconds) return self._expiry @property def lock_token(self): # type: () -> Optional[Union[uuid.UUID, str]] """ :rtype: ~uuid.UUID or str """ if self.settled: return None if self.message.delivery_tag: return uuid.UUID(bytes_le=self.message.delivery_tag) delivery_annotations = self.message.delivery_annotations if delivery_annotations: return delivery_annotations.get(_X_OPT_LOCK_TOKEN) return None def _check_live(self, action): # pylint: disable=no-member if not self._receiver or not self._receiver._running: # pylint: disable=protected-access raise MessageSettleFailed(action, "Orphan message had no open connection.") if self.settled: raise MessageAlreadySettled(action) try: if self.expired: raise MessageLockExpired(inner_exception=self.auto_renew_error) except TypeError: pass try: if self._receiver.session.expired: raise SessionLockExpired(inner_exception=self._receiver.session.auto_renew_error) except AttributeError: pass def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_letter_description=None): # type: (str, Optional[str], Optional[str]) -> Callable # pylint: disable=protected-access if settle_operation == MESSAGE_COMPLETE: return functools.partial( self._receiver._settle_message, SETTLEMENT_COMPLETE, [self.lock_token], ) if settle_operation == MESSAGE_ABANDON: return functools.partial( self._receiver._settle_message, SETTLEMENT_ABANDON, [self.lock_token], ) if settle_operation == MESSAGE_DEAD_LETTER: return functools.partial( self._receiver._settle_message, SETTLEMENT_DEADLETTER, [self.lock_token], dead_letter_details={ MGMT_REQUEST_DEAD_LETTER_REASON: dead_letter_reason or "", MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: dead_letter_description or "" } ) if settle_operation == MESSAGE_DEFER: return functools.partial( self._receiver._settle_message, SETTLEMENT_DEFER, [self.lock_token], ) raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, dead_letter_description=None): # type: (str, Optional[str], Optional[str]) -> Callable if settle_operation == MESSAGE_COMPLETE: return functools.partial(self.message.accept) if settle_operation == MESSAGE_ABANDON: return functools.partial(self.message.modify, True, False) if settle_operation == MESSAGE_DEAD_LETTER: return functools.partial( self.message.reject, condition=DEADLETTERNAME, description=dead_letter_description, info={ RECEIVER_LINK_DEAD_LETTER_REASON: dead_letter_reason, RECEIVER_LINK_DEAD_LETTER_DESCRIPTION: dead_letter_description } ) if settle_operation == MESSAGE_DEFER: return functools.partial(self.message.modify, True, True) raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) def _settle_message( self, settle_operation, dead_letter_reason=None, dead_letter_description=None, ): # type: (str, Optional[str], Optional[str]) -> None try: if not self._is_deferred_message: try: self._settle_via_receiver_link(settle_operation, dead_letter_reason=dead_letter_reason, dead_letter_description=dead_letter_description)() return except RuntimeError as exception: _LOGGER.info( "Message settling: %r has encountered an exception (%r)." "Trying to settle through management link", settle_operation, exception ) self._settle_via_mgmt_link(settle_operation, dead_letter_reason=dead_letter_reason, dead_letter_description=dead_letter_description)() except Exception as e: raise MessageSettleFailed(settle_operation, e)
[docs] def complete(self): # type: () -> None """Complete the message. This removes the message from the queue. :rtype: None :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. """ # pylint: disable=protected-access self._check_live(MESSAGE_COMPLETE) self._settle_message(MESSAGE_COMPLETE) self._settled = True
[docs] def dead_letter(self, reason=None, description=None): # type: (Optional[str], Optional[str]) -> None """Move the message to the Dead Letter queue. The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue. :param str reason: The reason for dead-lettering the message. :param str description: The detailed description for dead-lettering the message. :rtype: None :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. """ # pylint: disable=protected-access self._check_live(MESSAGE_DEAD_LETTER) self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_reason=reason, dead_letter_description=description) self._settled = True
[docs] def abandon(self): # type: () -> None """Abandon the message. This message will be returned to the queue and made available to be received again. :rtype: None :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. """ # pylint: disable=protected-access self._check_live(MESSAGE_ABANDON) self._settle_message(MESSAGE_ABANDON) self._settled = True
[docs] def defer(self): # type: () -> None """Defer the message. This message will remain in the queue but must be requested specifically by its sequence number in order to be received. :rtype: None :raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled. :raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired. :raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails. """ self._check_live(MESSAGE_DEFER) self._settle_message(MESSAGE_DEFER) self._settled = True
[docs] def renew_lock(self): # type: () -> None """Renew the message lock. This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed. In order to complete (or otherwise settle) the message, the lock must be maintained. Messages received via ReceiveAndDelete mode are not locked, and therefore cannot be renewed. This operation can also be performed as a threaded background task by registering the message with an `azure.servicebus.AutoLockRenew` instance. This operation is only available for non-sessionful messages. :rtype: None :raises: TypeError if the message is sessionful. :raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired. :raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled. """ try: if self._receiver.session: raise TypeError("Session messages cannot be renewed. Please renew the Session lock instead.") except AttributeError: pass self._check_live(MESSAGE_RENEW_LOCK) token = self.lock_token if not token: raise ValueError("Unable to renew lock - no lock token found.") expiry = self._receiver._renew_locks(token) # pylint: disable=protected-access,no-member self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0)