Source code for azure.servicebus._common.message

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
# pylint: disable=too-many-lines

from __future__ import annotations
import time
import warnings
import datetime
import uuid
from typing import Optional, Dict, List, Union, Iterable, Any, Mapping, cast, TYPE_CHECKING

from .._pyamqp._message_backcompat import LegacyMessage, LegacyBatchMessage
from .._pyamqp.message import Message as pyamqp_Message
from .._transport._pyamqp_transport import PyamqpTransport

from .constants import (
    _BATCH_MESSAGE_OVERHEAD_COST,
    ServiceBusReceiveMode,
    ServiceBusMessageState,
    _X_OPT_ENQUEUED_TIME,
    _X_OPT_SEQUENCE_NUMBER,
    _X_OPT_ENQUEUE_SEQUENCE_NUMBER,
    _X_OPT_PARTITION_KEY,
    _X_OPT_LOCKED_UNTIL,
    _X_OPT_LOCK_TOKEN,
    _X_OPT_SCHEDULED_ENQUEUE_TIME,
    _X_OPT_DEAD_LETTER_SOURCE,
    PROPERTIES_DEAD_LETTER_REASON,
    PROPERTIES_DEAD_LETTER_ERROR_DESCRIPTION,
    MESSAGE_PROPERTY_MAX_LENGTH,
    MAX_ABSOLUTE_EXPIRY_TIME,
    MAX_DURATION_VALUE,
    MAX_MESSAGE_LENGTH_BYTES,
    MESSAGE_STATE_NAME
)
from ..amqp import (
    AmqpAnnotatedMessage,
    AmqpMessageBodyType,
    AmqpMessageHeader,
    AmqpMessageProperties
)
from ..exceptions import MessageSizeExceededError
from .utils import (
    utc_from_timestamp,
    utc_now,
    transform_outbound_messages,
)
from .tracing import trace_message

if TYPE_CHECKING:
    try:
        # pylint:disable=unused-import
        from uamqp import (
            Message,
            BatchMessage
        )
    except ImportError:
        pass
    from .._pyamqp.performatives import TransferFrame
    from ..aio._servicebus_receiver_async import (
        ServiceBusReceiver as AsyncServiceBusReceiver,
    )
    from .._servicebus_receiver import ServiceBusReceiver
PrimitiveTypes = Union[
    int,
    float,
    bytes,
    bool,
    str,
    uuid.UUID
]


[docs]class ServiceBusMessage( object ): # pylint: disable=too-many-public-methods,too-many-instance-attributes """A Service Bus Message. :param body: The data to send in a single message. :type body: Optional[Union[str, bytes]] :keyword application_properties: The user defined properties on the message. :paramtype application_properties: Dict[str, Union[int or float or bool or bytes or str or uuid.UUID or datetime or None]] :keyword Optional[str] session_id: The session identifier of the message for a sessionful entity. :keyword Optional[str] message_id: The id to identify the message. :keyword Optional[datetime.datetime] scheduled_enqueue_time_utc: The utc scheduled enqueue time to the message. :keyword Optional[datetime.timedelta] time_to_live: The life duration of a message. :keyword Optional[str] content_type: The content type descriptor. :keyword Optional[str] correlation_id: The correlation identifier. :keyword Optional[str] subject: The application specific subject, sometimes referred to as label. :keyword Optional[str] partition_key: The partition key for sending a message to a partitioned entity. :keyword Optional[str] to: The `to` address used for auto_forward chaining scenarios. :keyword Optional[str] reply_to: The address of an entity to send replies to. :keyword Optional[str] reply_to_session_id: The session identifier augmenting the `reply_to` address. .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py :start-after: [START send_complex_message] :end-before: [END send_complex_message] :language: python :dedent: 4 :caption: Sending a message with additional properties """ def __init__( self, body: Optional[Union[str, bytes]], *, application_properties: Optional[Dict[Union[str, bytes], "PrimitiveTypes"]] = None, session_id: Optional[str] = None, message_id: Optional[str] = None, scheduled_enqueue_time_utc: Optional[datetime.datetime] = None, time_to_live: Optional[datetime.timedelta] = None, content_type: Optional[str] = None, correlation_id: Optional[str] = None, subject: Optional[str] = None, partition_key: Optional[str] = None, to: Optional[str] = None, reply_to: Optional[str] = None, reply_to_session_id: Optional[str] = None, **kwargs: Any ) -> None: # Although we might normally thread through **kwargs this causes # problems as MessageProperties won't absorb spurious args. self._encoding = kwargs.pop("encoding", "UTF-8") self._uamqp_message: Optional[Union[LegacyMessage, "Message"]] = None self._message: Union["Message", "pyamqp_Message"] = None # type: ignore # Internal usage only for transforming AmqpAnnotatedMessage to outgoing ServiceBusMessage if "message" in kwargs: self._message = kwargs["message"] if "raw_amqp_message" in kwargs: self._raw_amqp_message = kwargs["raw_amqp_message"] else: self._raw_amqp_message = AmqpAnnotatedMessage(message=kwargs["message"]) else: self._build_annotated_message(body) self.application_properties = application_properties self.session_id = session_id self.message_id = message_id self.content_type = content_type self.correlation_id = correlation_id self.to = to self.reply_to = reply_to self.reply_to_session_id = reply_to_session_id self.subject = subject self.scheduled_enqueue_time_utc = scheduled_enqueue_time_utc self.time_to_live = time_to_live self.partition_key = partition_key def __str__(self) -> str: return str(self.raw_amqp_message) def __repr__(self) -> str: # pylint: disable=bare-except message_repr = "body={}".format( str(self) ) try: message_repr += ", application_properties={}".format(self.application_properties) except: message_repr += ", application_properties=<read-error>" try: message_repr += ", session_id={}".format(self.session_id) except: message_repr += ", session_id=<read-error>" try: message_repr += ", message_id={}".format(self.message_id) except: message_repr += ", message_id=<read-error>" try: message_repr += ", content_type={}".format(self.content_type) except: message_repr += ", content_type=<read-error>" try: message_repr += ", correlation_id={}".format(self.correlation_id) except: message_repr += ", correlation_id=<read-error>" try: message_repr += ", to={}".format(self.to) except: message_repr += ", to=<read-error>" try: message_repr += ", reply_to={}".format(self.reply_to) except: message_repr += ", reply_to=<read-error>" try: message_repr += ", reply_to_session_id={}".format(self.reply_to_session_id) except: message_repr += ", reply_to_session_id=<read-error>" try: message_repr += ", subject={}".format(self.subject) except: message_repr += ", subject=<read-error>" try: message_repr += ", time_to_live={}".format(self.time_to_live) except: message_repr += ", time_to_live=<read-error>" try: message_repr += ", partition_key={}".format(self.partition_key) except: message_repr += ", partition_key=<read-error>" try: message_repr += ", scheduled_enqueue_time_utc={}".format(self.scheduled_enqueue_time_utc) except: message_repr += ", scheduled_enqueue_time_utc=<read-error>" return "ServiceBusMessage({})".format(message_repr)[:1024] def _build_annotated_message(self, body): if not ( isinstance(body, (str, bytes)) or (body is None) ): raise TypeError( "ServiceBusMessage body must be a string, bytes, or None. Got instead: {}".format( type(body) ) ) self._raw_amqp_message = AmqpAnnotatedMessage(value_body=None, encoding=self._encoding) \ if body is None else AmqpAnnotatedMessage(data_body=body, encoding=self._encoding) self._raw_amqp_message.header = AmqpMessageHeader() self._raw_amqp_message.properties = AmqpMessageProperties() def _set_message_annotations(self, key, value): if not self._raw_amqp_message.annotations: self._raw_amqp_message.annotations = {} if value is None: try: del self._raw_amqp_message.annotations[key] except KeyError: pass else: self._raw_amqp_message.annotations[key] = value @property def message(self) -> Union["Message", LegacyMessage]: """DEPRECATED: Get the underlying uamqp.Message or LegacyMessage. This is deprecated and will be removed in a later release. :rtype: uamqp.Message or LegacyMessage """ warnings.warn( "The `message` property is deprecated and will be removed in future versions.", DeprecationWarning, ) if not self._uamqp_message: self._uamqp_message = LegacyMessage( self._raw_amqp_message, to_outgoing_amqp_message=PyamqpTransport.to_outgoing_amqp_message ) return self._uamqp_message @message.setter def message(self, value: "Message") -> None: """DEPRECATED: Set the underlying Message. This is deprecated and will be removed in a later release. :param value: The uamqp.Message to use as the underlying message. :type value: ~uamqp.Message """ warnings.warn( "The `message` property is deprecated and will be removed in future versions.", DeprecationWarning, ) self._uamqp_message = value @property def raw_amqp_message(self) -> AmqpAnnotatedMessage: """Advanced usage only. The internal AMQP message payload that is sent or received. :rtype: ~azure.servicebus.amqp.AmqpAnnotatedMessage """ return self._raw_amqp_message @property def session_id(self) -> Optional[str]: """The session identifier of the message for a sessionful entity. For sessionful entities, this application-defined value specifies the session affiliation of the message. Messages with the same session identifier are subject to summary locking and enable exact in-order processing and demultiplexing. For non-sessionful entities, this value is ignored. See Message Sessions in `https://docs.microsoft.com/azure/service-bus-messaging/message-sessions`. :rtype: str or None """ if not self._raw_amqp_message.properties: return None try: return self._raw_amqp_message.properties.group_id.decode("UTF-8") except (AttributeError, UnicodeDecodeError): return self._raw_amqp_message.properties.group_id @session_id.setter def session_id(self, value: str) -> None: if value and len(value) > MESSAGE_PROPERTY_MAX_LENGTH: raise ValueError( "session_id cannot be longer than {} characters.".format( MESSAGE_PROPERTY_MAX_LENGTH ) ) if not self._raw_amqp_message.properties: self._raw_amqp_message.properties = AmqpMessageProperties() self._raw_amqp_message.properties.group_id = value @property def application_properties(self) -> Optional[Dict[Union[str, bytes], PrimitiveTypes]]: """The user defined properties on the message. :rtype: dict[str or bytes, PrimitiveTypes] or None """ return self._raw_amqp_message.application_properties @application_properties.setter def application_properties(self, value: Dict[Union[str, bytes], Any]) -> None: self._raw_amqp_message.application_properties = value @property def partition_key(self) -> Optional[str]: """The partition key for sending a message to a partitioned entity. Setting this value enables assigning related messages to the same internal partition, so that submission sequence order is correctly recorded. The partition is chosen by a hash function over this value and cannot be chosen directly. See Partitioned queues and topics in `https://docs.microsoft.com/azure/service-bus-messaging/service-bus-partitioning`. :rtype: str or None """ try: opt_p_key = self._raw_amqp_message.annotations.get(_X_OPT_PARTITION_KEY) # type: ignore if opt_p_key is not None: return opt_p_key.decode("UTF-8") except (AttributeError, UnicodeDecodeError): return opt_p_key return None @partition_key.setter def partition_key(self, value: str) -> None: if value and len(value) > MESSAGE_PROPERTY_MAX_LENGTH: raise ValueError( "partition_key cannot be longer than {} characters.".format( MESSAGE_PROPERTY_MAX_LENGTH ) ) if value and self.session_id is not None and value != self.session_id: raise ValueError( "partition_key:{} cannot be set to a different value than session_id:{}".format( value, self.session_id ) ) self._set_message_annotations(_X_OPT_PARTITION_KEY, value) @property def time_to_live(self) -> Optional[datetime.timedelta]: """The life duration of a message. This value is the relative duration after which the message expires, starting from the instant the message has been accepted and stored by the broker, as captured in `enqueued_time_utc`. When not set explicitly, the assumed value is the DefaultTimeToLive for the respective queue or topic. A message-level time-to-live value cannot be longer than the entity's time-to-live setting and it is silently adjusted if it does. See Expiration in `https://docs.microsoft.com/azure/service-bus-messaging/message-expiration` :rtype: ~datetime.timedelta """ if self._raw_amqp_message.header and self._raw_amqp_message.header.time_to_live: return datetime.timedelta(milliseconds=self._raw_amqp_message.header.time_to_live) return None @time_to_live.setter def time_to_live(self, value: Union[datetime.timedelta, int]) -> None: if not self._raw_amqp_message.header: self._raw_amqp_message.header = AmqpMessageHeader() if value is None: self._raw_amqp_message.header.time_to_live = value if self._raw_amqp_message.properties.absolute_expiry_time: self._raw_amqp_message.properties.absolute_expiry_time = value elif isinstance(value, datetime.timedelta): self._raw_amqp_message.header.time_to_live = int(value.total_seconds()) * 1000 else: self._raw_amqp_message.header.time_to_live = int(value) * 1000 if self._raw_amqp_message.header.time_to_live and \ self._raw_amqp_message.header.time_to_live != MAX_DURATION_VALUE: if not self._raw_amqp_message.properties: self._raw_amqp_message.properties = AmqpMessageProperties() self._raw_amqp_message.properties.creation_time = int(time.mktime(utc_now().timetuple())) * 1000 self._raw_amqp_message.properties.absolute_expiry_time = min( MAX_ABSOLUTE_EXPIRY_TIME, self._raw_amqp_message.properties.creation_time + self._raw_amqp_message.header.time_to_live ) @property def scheduled_enqueue_time_utc(self) -> Optional[datetime.datetime]: """The utc scheduled enqueue time to the message. This property can be used for scheduling when sending a message through `ServiceBusSender.send` method. If cancelling scheduled messages is required, you should use the `ServiceBusSender.schedule` method, which returns sequence numbers that can be used for future cancellation. `scheduled_enqueue_time_utc` is None if not set. :rtype: ~datetime.datetime """ if self._raw_amqp_message.annotations: timestamp = self._raw_amqp_message.annotations.get(_X_OPT_SCHEDULED_ENQUEUE_TIME) if timestamp: try: in_seconds = timestamp / 1000.0 return utc_from_timestamp(in_seconds) except TypeError: return timestamp return None @scheduled_enqueue_time_utc.setter def scheduled_enqueue_time_utc(self, value: datetime.datetime) -> None: if not self._raw_amqp_message.properties: self._raw_amqp_message.properties = AmqpMessageProperties() if not self._raw_amqp_message.properties.message_id: self._raw_amqp_message.properties.message_id = str(uuid.uuid4()) self._set_message_annotations(_X_OPT_SCHEDULED_ENQUEUE_TIME, value) @property def body(self) -> Any: """The body of the Message. The format may vary depending on the body type: For :class:`azure.servicebus.amqp.AmqpMessageBodyType.DATA<azure.servicebus.amqp.AmqpMessageBodyType.DATA>`, the body could be bytes or Iterable[bytes]. For :class:`azure.servicebus.amqp.AmqpMessageBodyType.SEQUENCE<azure.servicebus.amqp.AmqpMessageBodyType.SEQUENCE>`, the body could be List or Iterable[List]. For :class:`azure.servicebus.amqp.AmqpMessageBodyType.VALUE<azure.servicebus.amqp.AmqpMessageBodyType.VALUE>`, the body could be any type. :rtype: Any """ return self._raw_amqp_message.body @property def body_type(self) -> AmqpMessageBodyType: """The body type of the underlying AMQP message. :rtype: ~azure.servicebus.amqp.AmqpMessageBodyType """ return self._raw_amqp_message.body_type @property def content_type(self) -> Optional[str]: """The content type descriptor. Optionally describes the payload of the message, with a descriptor following the format of RFC2045, Section 5, for example "application/json". :rtype: str or None """ if not self._raw_amqp_message.properties: return None try: return self._raw_amqp_message.properties.content_type.decode("UTF-8") except (AttributeError, UnicodeDecodeError): return self._raw_amqp_message.properties.content_type @content_type.setter def content_type(self, value: str) -> None: if not self._raw_amqp_message.properties: self._raw_amqp_message.properties = AmqpMessageProperties() self._raw_amqp_message.properties.content_type = value @property def correlation_id(self) -> Optional[str]: # pylint: disable=line-too-long """The correlation identifier. Allows an application to specify a context for the message for the purposes of correlation, for example reflecting the MessageId of a message that is being replied to. See Message Routing and Correlation in `https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation`. :rtype: str or None """ if not self._raw_amqp_message.properties: return None try: return self._raw_amqp_message.properties.correlation_id.decode("UTF-8") except (AttributeError, UnicodeDecodeError): return self._raw_amqp_message.properties.correlation_id @correlation_id.setter def correlation_id(self, value: str) -> None: if not self._raw_amqp_message.properties: self._raw_amqp_message.properties = AmqpMessageProperties() self._raw_amqp_message.properties.correlation_id = value @property def subject(self) -> Optional[str]: """The application specific subject, sometimes referred to as a label. This property enables the application to indicate the purpose of the message to the receiver in a standardized fashion, similar to an email subject line. :rtype: str """ if not self._raw_amqp_message.properties: return None try: return self._raw_amqp_message.properties.subject.decode("UTF-8") except (AttributeError, UnicodeDecodeError): return self._raw_amqp_message.properties.subject @subject.setter def subject(self, value: str) -> None: if not self._raw_amqp_message.properties: self._raw_amqp_message.properties = AmqpMessageProperties() self._raw_amqp_message.properties.subject = value @property def message_id(self) -> Optional[str]: """The id to identify the message. The message identifier is an application-defined value that uniquely identifies the message and its payload. The identifier is a free-form string and can reflect a GUID or an identifier derived from the application context. If enabled, the duplicate detection (see `https://docs.microsoft.com/azure/service-bus-messaging/duplicate-detection`) feature identifies and removes second and further submissions of messages with the same message id. :rtype: str or None """ if not self._raw_amqp_message.properties: return None try: return self._raw_amqp_message.properties.message_id.decode("UTF-8") except (AttributeError, UnicodeDecodeError): return self._raw_amqp_message.properties.message_id @message_id.setter def message_id(self, value: str) -> None: if value and len(str(value)) > MESSAGE_PROPERTY_MAX_LENGTH: raise ValueError( "message_id cannot be longer than {} characters.".format( MESSAGE_PROPERTY_MAX_LENGTH ) ) if not self._raw_amqp_message.properties: self._raw_amqp_message.properties = AmqpMessageProperties() self._raw_amqp_message.properties.message_id = value @property def reply_to(self) -> Optional[str]: # pylint: disable=line-too-long """The address of an entity to send replies to. This optional and application-defined value is a standard way to express a reply path to the receiver of the message. When a sender expects a reply, it sets the value to the absolute or relative path of the queue or topic it expects the reply to be sent to. See Message Routing and Correlation in `https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation`. :rtype: str or None """ if not self._raw_amqp_message.properties: return None try: return self._raw_amqp_message.properties.reply_to.decode("UTF-8") except (AttributeError, UnicodeDecodeError): return self._raw_amqp_message.properties.reply_to @reply_to.setter def reply_to(self, value: str) -> None: if not self._raw_amqp_message.properties: self._raw_amqp_message.properties = AmqpMessageProperties() self._raw_amqp_message.properties.reply_to = value @property def reply_to_session_id(self) -> Optional[str]: # pylint: disable=line-too-long """The session identifier augmenting the `reply_to` address. This value augments the `reply_to` information and specifies which session id should be set for the reply when sent to the reply entity. See Message Routing and Correlation in `https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation`. :rtype: str or None """ if not self._raw_amqp_message.properties: return None try: return self._raw_amqp_message.properties.reply_to_group_id.decode("UTF-8") except (AttributeError, UnicodeDecodeError): return self._raw_amqp_message.properties.reply_to_group_id @reply_to_session_id.setter def reply_to_session_id(self, value: str) -> None: if value and len(value) > MESSAGE_PROPERTY_MAX_LENGTH: raise ValueError( "reply_to_session_id cannot be longer than {} characters.".format( MESSAGE_PROPERTY_MAX_LENGTH ) ) if not self._raw_amqp_message.properties: self._raw_amqp_message.properties = AmqpMessageProperties() self._raw_amqp_message.properties.reply_to_group_id = value @property def to(self) -> Optional[str]: """The `to` address. This property is reserved for future use in routing scenarios and presently ignored by the broker itself. Applications can use this value in rule-driven auto-forward chaining scenarios to indicate the intended logical destination of the message. See https://docs.microsoft.com/azure/service-bus-messaging/service-bus-auto-forwarding for more details. :rtype: str or None """ if not self._raw_amqp_message.properties: return None try: return self._raw_amqp_message.properties.to.decode("UTF-8") except (AttributeError, UnicodeDecodeError): return self._raw_amqp_message.properties.to @to.setter def to(self, value: str) -> None: if not self._raw_amqp_message.properties: self._raw_amqp_message.properties = AmqpMessageProperties() self._raw_amqp_message.properties.to = value
[docs]class ServiceBusMessageBatch(object): """A batch of messages. Sending messages in a batch is more performant than sending individual message. ServiceBusMessageBatch helps you create the maximum allowed size batch of `Message` to improve sending performance. Use the `add` method to add messages until the maximum batch size limit in bytes has been reached - at which point a `MessageSizeExceededError` will be raised. **Please use the create_message_batch method of ServiceBusSender to create a ServiceBusMessageBatch object instead of instantiating a ServiceBusMessageBatch object directly.** :param Optional[int] max_size_in_bytes: The maximum size of bytes data that a ServiceBusMessageBatch object can hold. """ def __init__( self, max_size_in_bytes: Optional[int] = None, **kwargs: Any ) -> None: self._amqp_transport = kwargs.pop("amqp_transport", PyamqpTransport) self._tracing_attributes: Dict[str, Union[str, int]] = kwargs.pop("tracing_attributes", {}) self._max_size_in_bytes = max_size_in_bytes or MAX_MESSAGE_LENGTH_BYTES self._message = self._amqp_transport.build_batch_message([]) self._size = self._amqp_transport.get_batch_message_encoded_size(self._message) self._count = 0 self._messages: List[ServiceBusMessage] = [] self._uamqp_message: Optional[LegacyBatchMessage] = None def __repr__(self) -> str: batch_repr = "max_size_in_bytes={}, message_count={}".format( self.max_size_in_bytes, self._count ) return "ServiceBusMessageBatch({})".format(batch_repr) def __len__(self) -> int: return self._count def _from_list(self, messages: Iterable[ServiceBusMessage]) -> None: for message in messages: self._add(message) def _add(self, add_message: Union[ServiceBusMessage, Mapping[str, Any], AmqpAnnotatedMessage]) -> None: """Actual add implementation. The shim exists to hide the internal parameters such as parent_span. :param add_message: The message to add. :type add_message: ~azure.servicebus.ServiceBusMessage or mapping[str, any] or ~azure.servicebus.amqp.AmqpAnnotatedMessage """ outgoing_sb_message = transform_outbound_messages( add_message, ServiceBusMessage, self._amqp_transport.to_outgoing_amqp_message ) outgoing_sb_message = cast(ServiceBusMessage, outgoing_sb_message) # pylint: disable=protected-access outgoing_sb_message._message = trace_message( outgoing_sb_message._message, amqp_transport=self._amqp_transport, additional_attributes=self._tracing_attributes ) message_size = self._amqp_transport.get_message_encoded_size( outgoing_sb_message._message # pylint: disable=protected-access ) # For a ServiceBusMessageBatch, if the encoded_message_size of message is < 256, then the overhead cost to # encode that message into the ServiceBusMessageBatch would be 5 bytes, if >= 256, it would be 8 bytes. size_after_add = ( self._size + message_size + _BATCH_MESSAGE_OVERHEAD_COST[0 if (message_size < 256) else 1] ) if size_after_add > self.max_size_in_bytes: raise MessageSizeExceededError( message=f"ServiceBusMessageBatch has reached its size limit: {self.max_size_in_bytes}" ) self._amqp_transport.add_batch(self, outgoing_sb_message) # pylint: disable=protected-access self._size = size_after_add self._count += 1 self._messages.append(outgoing_sb_message) @property def message(self) -> Union["BatchMessage", LegacyBatchMessage]: """DEPRECATED: Get the underlying uamqp.BatchMessage or LegacyBatchMessage. This is deprecated and will be removed in a later release. :rtype: ~uamqp.BatchMessage or LegacyBatchMessage """ warnings.warn( "The `message` property is deprecated and will be removed in future versions.", DeprecationWarning, ) if not self._uamqp_message: if self._amqp_transport.KIND == "pyamqp": message = AmqpAnnotatedMessage(message=pyamqp_Message(*self._message)) self._uamqp_message = LegacyBatchMessage( message, to_outgoing_amqp_message=PyamqpTransport.to_outgoing_amqp_message, ) else: self._uamqp_message = self._message return self._uamqp_message @message.setter def message(self, value: "BatchMessage") -> None: """DEPRECATED: Set the underlying BatchMessage. This is deprecated and will be removed in a later release. :param value: The BatchMessage to set. :type value: ~uamqp.BatchMessage """ warnings.warn( "The `message` property is deprecated and will be removed in future versions.", DeprecationWarning, ) self._uamqp_message = value @property def max_size_in_bytes(self) -> int: """The maximum size of bytes data that a ServiceBusMessageBatch object can hold. :rtype: int """ return self._max_size_in_bytes @property def size_in_bytes(self) -> int: """The combined size of the messages in the batch, in bytes. :rtype: int """ return self._size
[docs] def add_message(self, message: Union[ServiceBusMessage, AmqpAnnotatedMessage, Mapping[str, Any]]) -> None: """Try to add a single Message to the batch. The total size of an added message is the sum of its body, properties, etc. If this added size results in the batch exceeding the maximum batch size, a `MessageSizeExceededError` will be raised. :param message: The Message to be added to the batch. :type message: Union[~azure.servicebus.ServiceBusMessage, ~azure.servicebus.amqp.AmqpAnnotatedMessage] :raises: :class: ~azure.servicebus.exceptions.MessageSizeExceededError, when exceeding the size limit. """ return self._add(message)
[docs]class ServiceBusReceivedMessage(ServiceBusMessage): # pylint: disable=too-many-instance-attributes """ A Service Bus Message received from service side. :ivar auto_renew_error: Error when AutoLockRenewer is used and it fails to renew the message lock. :vartype auto_renew_error: ~azure.servicebus.AutoLockRenewTimeout or ~azure.servicebus.AutoLockRenewFailed .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py :start-after: [START receive_complex_message] :end-before: [END receive_complex_message] :language: python :dedent: 4 :caption: Checking the properties on a received message. """ def __init__( self, message: Union["Message", "pyamqp_Message"], receive_mode: Union[ServiceBusReceiveMode, str] = ServiceBusReceiveMode.PEEK_LOCK, frame: Optional["TransferFrame"] = None, **kwargs: Any ) -> None: self._amqp_transport = kwargs.pop("amqp_transport", PyamqpTransport) super(ServiceBusReceivedMessage, self).__init__(None, message=message) # type: ignore if self._amqp_transport.KIND == "uamqp": self._uamqp_message = message self._message = message self._settled = receive_mode == ServiceBusReceiveMode.RECEIVE_AND_DELETE self._delivery_tag = self._amqp_transport.get_message_delivery_tag(message, frame) self._delivery_id = self._amqp_transport.get_message_delivery_id(message, frame) # only used by pyamqp self._received_timestamp_utc = utc_now() self._is_deferred_message = kwargs.get("is_deferred_message", False) self._is_peeked_message = kwargs.get("is_peeked_message", False) self.auto_renew_error: Optional[Exception]= None try: self._receiver: Union["ServiceBusReceiver", "AsyncServiceBusReceiver"] = kwargs.pop( "receiver" ) except KeyError: raise TypeError( "ServiceBusReceivedMessage requires a receiver to be initialized. " + "This class should never be initialized by a user; " + "for outgoing messages, the ServiceBusMessage class should be utilized instead." ) from None self._expiry: Optional[datetime.datetime] = None def __getstate__(self) -> Dict[str, Any]: state = self.__dict__.copy() state['_receiver'] = None state['_uamqp_message'] = None return state def __setstate__(self, state: Dict[str, Any]) -> None: self.__dict__.update(state) @property def _lock_expired(self) -> bool: """ Whether the lock on the message has expired. :rtype: bool """ try: if self._receiver.session: # type: ignore raise TypeError( "Session messages do not expire. Please use the Session expiry instead." ) except AttributeError: # Is not a session receiver pass if self.locked_until_utc and self.locked_until_utc <= utc_now(): return True return False def __repr__(self) -> str: # pylint: disable=too-many-branches,too-many-statements # pylint: disable=bare-except message_repr = "body={}".format( str(self) ) try: message_repr += ", application_properties={}".format(self.application_properties) except: message_repr += ", application_properties=<read-error>" try: message_repr += ", session_id={}".format(self.session_id) except: message_repr += ", session_id=<read-error>" try: message_repr += ", message_id={}".format(self.message_id) except: message_repr += ", message_id=<read-error>" try: message_repr += ", content_type={}".format(self.content_type) except: message_repr += ", content_type=<read-error>" try: message_repr += ", correlation_id={}".format(self.correlation_id) except: message_repr += ", correlation_id=<read-error>" try: message_repr += ", to={}".format(self.to) except: message_repr += ", to=<read-error>" try: message_repr += ", reply_to={}".format(self.reply_to) except: message_repr += ", reply_to=<read-error>" try: message_repr += ", reply_to_session_id={}".format(self.reply_to_session_id) except: message_repr += ", reply_to_session_id=<read-error>" try: message_repr += ", subject={}".format(self.subject) except: message_repr += ", subject=<read-error>" try: message_repr += ", time_to_live={}".format(self.time_to_live) except: message_repr += ", time_to_live=<read-error>" try: message_repr += ", partition_key={}".format(self.partition_key) except: message_repr += ", partition_key=<read-error>" try: message_repr += ", scheduled_enqueue_time_utc={}".format(self.scheduled_enqueue_time_utc) except: message_repr += ", scheduled_enqueue_time_utc=<read-error>" try: message_repr += ", auto_renew_error={}".format(self.auto_renew_error) except: message_repr += ", auto_renew_error=<read-error>" try: message_repr += ", dead_letter_error_description={}".format(self.dead_letter_error_description) except: message_repr += ", dead_letter_error_description=<read-error>" try: message_repr += ", dead_letter_reason={}".format(self.dead_letter_reason) except: message_repr += ", dead_letter_reason=<read-error>" try: message_repr += ", dead_letter_source={}".format(self.dead_letter_source) except: message_repr += ", dead_letter_source=<read-error>" try: message_repr += ", delivery_count={}".format(self.delivery_count) except: message_repr += ", delivery_count=<read-error>" try: message_repr += ", enqueued_sequence_number={}".format(self.enqueued_sequence_number) except: message_repr += ", enqueued_sequence_number=<read-error>" try: message_repr += ", enqueued_time_utc={}".format(self.enqueued_time_utc) except: message_repr += ", enqueued_time_utc=<read-error>" try: message_repr += ", expires_at_utc={}".format(self.expires_at_utc) except: message_repr += ", expires_at_utc=<read-error>" try: message_repr += ", sequence_number={}".format(self.sequence_number) except: message_repr += ", sequence_number=<read-error>" try: message_repr += ", lock_token={}".format(self.lock_token) except: message_repr += ", lock_token=<read-error>" try: message_repr += ", locked_until_utc={}".format(self.locked_until_utc) except: message_repr += ", locked_until_utc=<read-error>" return "ServiceBusReceivedMessage({})".format(message_repr)[:1024] @property # type: ignore[misc] # TODO: ignoring error to copy over setter, since it's inherited def message(self) -> Union["Message", LegacyMessage]: """DEPRECATED: Get the underlying LegacyMessage. This is deprecated and will be removed in a later release. :rtype: LegacyMessage """ warnings.warn( "The `message` property is deprecated and will be removed in future versions.", DeprecationWarning, ) if not self._uamqp_message: if not self._settled: settler = self._receiver._handler # pylint:disable=protected-access else: settler = None self._uamqp_message = LegacyMessage( self._raw_amqp_message, delivery_no=self._delivery_id, delivery_tag=self._delivery_tag, settler=settler, encoding=self._encoding, to_outgoing_amqp_message=PyamqpTransport.to_outgoing_amqp_message ) return self._uamqp_message @property def dead_letter_error_description(self) -> Optional[str]: """ Dead letter error description, when the message is received from a deadletter subqueue of an entity. :rtype: str """ if self._raw_amqp_message.application_properties: try: return self._raw_amqp_message.application_properties.get( # type: ignore PROPERTIES_DEAD_LETTER_ERROR_DESCRIPTION ).decode("UTF-8") except AttributeError: pass return None @property def dead_letter_reason(self) -> Optional[str]: """ Dead letter reason, when the message is received from a deadletter subqueue of an entity. :rtype: str """ if self._raw_amqp_message.application_properties: try: return self._raw_amqp_message.application_properties.get( # type: ignore PROPERTIES_DEAD_LETTER_REASON ).decode("UTF-8") except AttributeError: pass return None @property def dead_letter_source(self) -> Optional[str]: """ The name of the queue or subscription that this message was enqueued on, before it was deadlettered. This property is only set in messages that have been dead-lettered and subsequently auto-forwarded from the dead-letter queue to another entity. Indicates the entity in which the message was dead-lettered. :rtype: str """ if self._raw_amqp_message.annotations: try: return self._raw_amqp_message.annotations.get(_X_OPT_DEAD_LETTER_SOURCE).decode( # type: ignore "UTF-8" ) except AttributeError: pass return None @property def state(self) -> ServiceBusMessageState: """ Defaults to Active. Represents the message state of the message. Can be Active, Deferred. or Scheduled. :rtype: ~azure.servicebus.ServiceBusMessageState """ try: message_state = self._raw_amqp_message.annotations.get(MESSAGE_STATE_NAME) try: return ServiceBusMessageState(message_state) except ValueError: return ServiceBusMessageState.ACTIVE if not message_state else message_state except AttributeError: return ServiceBusMessageState.ACTIVE @property def delivery_count(self) -> Optional[int]: """ Number of deliveries that have been attempted for this message. The count is incremented when a message lock expires or the message is explicitly abandoned by the receiver. :rtype: int """ if self._raw_amqp_message.header: return self._raw_amqp_message.header.delivery_count return None @property def enqueued_sequence_number(self) -> Optional[int]: """ For messages that have been auto-forwarded, this property reflects the sequence number that had first been assigned to the message at its original point of submission. :rtype: int """ if self._raw_amqp_message.annotations: return self._raw_amqp_message.annotations.get(_X_OPT_ENQUEUE_SEQUENCE_NUMBER) return None @property def enqueued_time_utc(self) -> Optional[datetime.datetime]: """ The UTC datetime at which the message has been accepted and stored in the entity. :rtype: ~datetime.datetime """ if self._raw_amqp_message.annotations: timestamp = self._raw_amqp_message.annotations.get(_X_OPT_ENQUEUED_TIME) if timestamp: in_seconds = timestamp / 1000.0 return utc_from_timestamp(in_seconds) return None @property def expires_at_utc(self) -> Optional[datetime.datetime]: """ The UTC datetime at which the message is marked for removal and no longer available for retrieval from the entity due to expiration. Expiry is controlled by the `Message.time_to_live` property. This property is computed from `Message.enqueued_time_utc` + `Message.time_to_live`. :rtype: ~datetime.datetime """ if self.enqueued_time_utc and self.time_to_live: return self.enqueued_time_utc + self.time_to_live return None @property def sequence_number(self) -> Optional[int]: """ The unique number assigned to a message by Service Bus. The sequence number is a unique 64-bit integer assigned to a message as it is accepted and stored by the broker and functions as its true identifier. For partitioned entities, the topmost 16 bits reflect the partition identifier. Sequence numbers monotonically increase. They roll over to 0 when the 48-64 bit range is exhausted. :rtype: int """ if self._raw_amqp_message.annotations: return self._raw_amqp_message.annotations.get(_X_OPT_SEQUENCE_NUMBER) return None @property def lock_token(self) -> Optional[Union[uuid.UUID, str]]: """ The lock token for the current message serving as a reference to the lock that is being held by the broker in PEEK_LOCK mode. :rtype: ~uuid.UUID or str """ if self._settled: return None if self._delivery_tag: return uuid.UUID(bytes_le=self._delivery_tag) delivery_annotations = self._raw_amqp_message.delivery_annotations if delivery_annotations: return delivery_annotations.get(_X_OPT_LOCK_TOKEN) return None @property def locked_until_utc(self) -> Optional[datetime.datetime]: """ The UTC datetime until which the message will be locked in the queue/subscription. When the lock expires, delivery count of the message is incremented and the message is again available for retrieval. :rtype: datetime.datetime """ try: if self._settled or self._receiver.session: # type: ignore return None except AttributeError: # not settled, and isn't session receiver. pass if self._expiry: return self._expiry if self._raw_amqp_message.annotations and _X_OPT_LOCKED_UNTIL in self._raw_amqp_message.annotations: expiry_in_seconds = self._raw_amqp_message.annotations[_X_OPT_LOCKED_UNTIL] / 1000 self._expiry = utc_from_timestamp(expiry_in_seconds) return self._expiry