# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
from typing import Optional, Any, cast, Mapping
from ._constants import AmqpMessageBodyType
from .._pyamqp.message import Message, Header, Properties
from .._pyamqp import utils as pyamqp_utils
class DictMixin(object):
def __setitem__(self, key, item):
# type: (Any, Any) -> None
self.__dict__[key] = item
def __getitem__(self, key):
# type: (Any) -> Any
return self.__dict__[key]
def __repr__(self):
# type: () -> str
return str(self)
def __len__(self):
# type: () -> int
return len(self.keys())
def __delitem__(self, key):
# type: (Any) -> None
self.__dict__[key] = None
def __eq__(self, other):
# type: (Any) -> bool
"""Compare objects by comparing all attributes."""
if isinstance(other, self.__class__):
return self.__dict__ == other.__dict__
return False
def __ne__(self, other):
# type: (Any) -> bool
"""Compare objects by comparing all attributes."""
return not self.__eq__(other)
def __str__(self):
# type: () -> str
return str({k: v for k, v in self.__dict__.items() if not k.startswith("_")})
def has_key(self, k):
# type: (Any) -> bool
return k in self.__dict__
def update(self, *args, **kwargs):
# type: (Any, Any) -> None
return self.__dict__.update(*args, **kwargs)
def keys(self):
# type: () -> list
return [k for k in self.__dict__ if not k.startswith("_")]
def values(self):
# type: () -> list
return [v for k, v in self.__dict__.items() if not k.startswith("_")]
def items(self):
# type: () -> list
return [(k, v) for k, v in self.__dict__.items() if not k.startswith("_")]
def get(self, key, default=None):
# type: (Any, Optional[Any]) -> Any
if key in self.__dict__:
return self.__dict__[key]
return default
[docs]class AmqpAnnotatedMessage(object):
# pylint: disable=too-many-instance-attributes
"""
The AMQP Annotated Message for advanced sending and receiving scenarios which allows you to
access to low-level AMQP message sections. There should be one and only one of either data_body, sequence_body
or value_body being set as the body of the AmqpAnnotatedMessage; if more than one body is set, `ValueError` will
be raised.
Please refer to the AMQP spec:
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
for more information on the message format.
:keyword data_body: The body consists of one or more data sections and each section contains opaque binary data.
:paramtype data_body: Union[str, bytes, List[Union[str, bytes]]]
:keyword sequence_body: The body consists of one or more sequence sections and
each section contains an arbitrary number of structured data elements.
:paramtype sequence_body: List[Any]
:keyword value_body: The body consists of one amqp-value section and the section contains a single AMQP value.
:paramtype value_body: Any
:keyword header: The amqp message header.
:paramtype header: Optional[~azure.eventhub.amqp.AmqpMessageHeader]
:keyword footer: The amqp message footer.
:paramtype footer: Optional[dict]
:keyword properties: Properties to add to the amqp message.
:paramtype properties: Optional[~azure.eventhub.amqp.AmqpMessageProperties]
:keyword application_properties: Service specific application properties.
:paramtype application_properties: Optional[dict]
:keyword annotations: Service specific message annotations.
:paramtype annotations: Optional[dict]
:keyword delivery_annotations: Service specific delivery annotations.
:paramtype delivery_annotations: Optional[dict]
"""
def __init__(self, **kwargs):
# type: (Any) -> None
self._message = kwargs.pop("message", None)
self._encoding = kwargs.pop("encoding", "UTF-8")
# internal usage only for Event Hub received message
if self._message:
self._from_amqp_message(self._message)
return
# manually constructed AMQPAnnotatedMessage
input_count_validation = len([key for key in ("data_body", "sequence_body", "value_body") if key in kwargs])
if input_count_validation != 1:
raise ValueError(
"There should be one and only one of either data_body, sequence_body "
"or value_body being set as the body of the AmqpAnnotatedMessage."
)
self._body = None
self._body_type = None
if "data_body" in kwargs:
self._body = pyamqp_utils.normalized_data_body(kwargs.get("data_body"))
self._message = Message(data=self._body)
self._body_type = AmqpMessageBodyType.DATA
elif "sequence_body" in kwargs:
self._body = pyamqp_utils.normalized_sequence_body(kwargs.get("sequence_body"))
self._body_type = AmqpMessageBodyType.SEQUENCE
self._message = Message(sequence=self._body)
elif "value_body" in kwargs:
self._body = kwargs.get("value_body")
self._body_type = AmqpMessageBodyType.VALUE
self._message = Message(value=self._body)
#self._message = uamqp.message.Message(body=self._body, body_type=self._body_type)
header_dict = cast(Mapping, kwargs.get("header"))
self._header = AmqpMessageHeader(**header_dict) if "header" in kwargs else None
self._footer = kwargs.get("footer")
properties_dict = cast(Mapping, kwargs.get("properties"))
self._properties = AmqpMessageProperties(**properties_dict) if "properties" in kwargs else None
self._application_properties = kwargs.get("application_properties")
self._annotations = kwargs.get("annotations")
self._delivery_annotations = kwargs.get("delivery_annotations")
def __str__(self):
if self._body_type == AmqpMessageBodyType.DATA:
output_str = ""
for data_section in self.body:
try:
output_str += data_section.decode(self._encoding)
except AttributeError:
output_str += str(data_section)
return output_str
elif self._body_type == AmqpMessageBodyType.SEQUENCE:
output_str = ""
for sequence_section in self.body:
for d in sequence_section:
try:
output_str += d.decode(self._encoding)
except AttributeError:
output_str += str(d)
return output_str
else:
if not self.body:
return ""
try:
return self.body.decode(self._encoding)
except AttributeError:
return str(self.body)
def __repr__(self):
# type: () -> str
# pylint: disable=bare-except
message_repr = "body={}".format(
str(self)
)
message_repr += ", body_type={}".format(self.body_type)
try:
message_repr += ", header={}".format(self.header)
except:
message_repr += ", header=<read-error>"
try:
message_repr += ", footer={}".format(self.footer)
except:
message_repr += ", footer=<read-error>"
try:
message_repr += ", properties={}".format(self.properties)
except:
message_repr += ", properties=<read-error>"
try:
message_repr += ", application_properties={}".format(self.application_properties)
except:
message_repr += ", application_properties=<read-error>"
try:
message_repr += ", delivery_annotations={}".format(self.delivery_annotations)
except:
message_repr += ", delivery_annotations=<read-error>"
try:
message_repr += ", annotations={}".format(self.annotations)
except:
message_repr += ", annotations=<read-error>"
return "AmqpAnnotatedMessage({})".format(message_repr)[:1024]
def _from_amqp_message(self, message):
# populate the properties from an uamqp message
# TODO: message.properties should not be a list
self._properties = AmqpMessageProperties(
message_id=message.properties.message_id,
user_id=message.properties.user_id,
to=message.properties.to,
subject=message.properties.subject,
reply_to=message.properties.reply_to,
correlation_id=message.properties.correlation_id,
content_type=message.properties.content_type,
content_encoding=message.properties.content_encoding,
absolute_expiry_time=message.properties.absolute_expiry_time,
creation_time=message.properties.creation_time,
group_id=message.properties.group_id,
group_sequence=message.properties.group_sequence,
reply_to_group_id=message.properties.reply_to_group_id,
) if message.properties else None
self._header = AmqpMessageHeader(
delivery_count=message.header.delivery_count,
time_to_live=message.header.time_to_live,
first_acquirer=message.header.first_acquirer,
durable=message.header.durable,
priority=message.header.priority
) if message.header else None
self._footer = message.footer if message.footer else {}
self._annotations = message.message_annotations if message.message_annotations else {}
self._delivery_annotations = message.delivery_annotations if message.delivery_annotations else {}
self._application_properties = message.application_properties if message.application_properties else {}
def _to_outgoing_amqp_message(self):
message_header = None
if self.header and any(self.header.values()):
message_header = Header(
delivery_count=self.header.delivery_count,
ttl=self.header.time_to_live,
first_acquirer=self.header.first_acquirer,
durable=self.header.durable,
priority=self.header.priority
)
message_properties = None
if self.properties and any(self.properties.values()):
message_properties = Properties(
message_id=self.properties.message_id,
user_id=self.properties.user_id,
to=self.properties.to,
subject=self.properties.subject,
reply_to=self.properties.reply_to,
correlation_id=self.properties.correlation_id,
content_type=self.properties.content_type,
content_encoding=self.properties.content_encoding,
creation_time=int(self.properties.creation_time) if self.properties.creation_time else None,
absolute_expiry_time=int(self.properties.absolute_expiry_time)
if self.properties.absolute_expiry_time else None,
group_id=self.properties.group_id,
group_sequence=self.properties.group_sequence,
reply_to_group_id=self.properties.reply_to_group_id
)
dict = {
"header": message_header,
"properties": message_properties,
"application_properties": self.application_properties,
"message_annotations": self.annotations,
"delivery_annotations": self.delivery_annotations,
"footer": self.footer
}
if self.body_type == AmqpMessageBodyType.DATA:
dict["data"] = self._body
elif self.body_type == AmqpMessageBodyType.SEQUENCE:
dict["sequence"] = self._body
else:
dict["value"] = self._body
return Message(**dict)
@property
def body(self):
# type: () -> Any
"""The body of the Message. The format may vary depending on the body type:
For ~azure.eventhub.AmqpMessageBodyType.DATA, the body could be bytes or Iterable[bytes]
For ~azure.eventhub.AmqpMessageBodyType.SEQUENCE, the body could be List or Iterable[List]
For ~azure.eventhub.AmqpMessageBodyType.VALUE, the body could be any type.
:rtype: Any
"""
return self._message.data or self._message.sequence or self._message.value
@property
def body_type(self):
# type: () -> AmqpMessageBodyType
"""The body type of the underlying AMQP message.
rtype: ~azure.eventhub.amqp.AmqpMessageBodyType
"""
if self._message.data:
return AmqpMessageBodyType.DATA
elif self._message.sequence:
return AmqpMessageBodyType.SEQUENCE
else:
return AmqpMessageBodyType.VALUE
@property
def properties(self):
# type: () -> Optional[AmqpMessageProperties]
"""
Properties to add to the message.
:rtype: Optional[~azure.eventhub.amqp.AmqpMessageProperties]
"""
return self._properties
@properties.setter
def properties(self, value):
# type: (AmqpMessageProperties) -> None
self._properties = value
@property
def application_properties(self):
# type: () -> Optional[dict]
"""
Service specific application properties.
:rtype: Optional[dict]
"""
return self._application_properties
@application_properties.setter
def application_properties(self, value):
# type: (dict) -> None
self._application_properties = value
@property
def annotations(self):
# type: () -> Optional[dict]
"""
Service specific message annotations.
:rtype: Optional[dict]
"""
return self._annotations
@annotations.setter
def annotations(self, value):
# type: (dict) -> None
self._annotations = value
@property
def delivery_annotations(self):
# type: () -> Optional[dict]
"""
Delivery-specific non-standard properties at the head of the message.
Delivery annotations convey information from the sending peer to the receiving peer.
:rtype: dict
"""
return self._delivery_annotations
@delivery_annotations.setter
def delivery_annotations(self, value):
# type: (dict) -> None
self._delivery_annotations = value
@property
def header(self):
# type: () -> Optional[AmqpMessageHeader]
"""
The message header.
:rtype: Optional[~azure.eventhub.amqp.AmqpMessageHeader]
"""
return self._header
@header.setter
def header(self, value):
# type: (AmqpMessageHeader) -> None
self._header = value
@property
def footer(self):
# type: () -> Optional[dict]
"""
The message footer.
:rtype: Optional[dict]
"""
return self._footer
@footer.setter
def footer(self, value):
# type: (dict) -> None
self._footer = value
# self._message.footer = value
[docs]class AmqpMessageProperties(DictMixin):
# pylint: disable=too-many-instance-attributes
"""Message properties.
The properties that are actually used will depend on the service implementation.
Not all received messages will have all properties, and not all properties
will be utilized on a sent message.
Please refer to the AMQP spec:
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties
for more information on the message properties.
:keyword message_id: Message-id, if set, uniquely identifies a message within the message system.
The message producer is usually responsible for setting the message-id in such a way that it
is assured to be globally unique. A broker MAY discard a message as a duplicate if the value
of the message-id matches that of a previously received message sent to the same node.
:paramtype message_id: Optional[Union[str, bytes, uuid.UUID]]
:keyword user_id: The identity of the user responsible for producing the message. The client sets
this value, and it MAY be authenticated by intermediaries.
:paramtype user_id: Optional[Union[str, bytes]]
:keyword to: The to field identifies the node that is the intended destination of the message.
On any given transfer this might not be the node at the receiving end of the link.
:paramtype to: Optional[Union[str, bytes]]
:keyword subject: A common field for summary information about the message content and purpose.
:paramtype subject: Optional[Union[str, bytes]]
:keyword reply_to: The address of the node to send replies to.
:paramtype reply_to: Optional[Union[str, bytes]]
:keyword correlation_id: This is a client-specific id that can be used to mark or identify messages between clients.
:paramtype correlation_id: Optional[Union[str, bytes]]
:keyword content_type: The RFC-2046 MIME type for the message's application-data section (body).
:paramtype content_type: Optional[Union[str, bytes]]
:keyword content_encoding: The content-encoding property is used as a modifier to the content-type.
:paramtype content_encoding: Optional[Union[str, bytes]]
:keyword creation_time: An absolute time when this message was created.
:paramtype creation_time: Optional[int]
:keyword absolute_expiry_time: An absolute time when this message is considered to be expired.
:paramtype absolute_expiry_time: Optional[int]
:keyword group_id: Identifies the group the message belongs to.
:paramtype group_id: Optional[Union[str, bytes]]
:keyword group_sequence: The relative position of this message within its group.
:paramtype group_sequence: Optional[int]
:keyword reply_to_group_id: This is a client-specific id that is used so that client can send replies
to this message to a specific group.
:paramtype reply_to_group_id: Optional[Union[str, bytes]]
:ivar message_id: Message-id, if set, uniquely identifies a message within the message system.
The message producer is usually responsible for setting the message-id in such a way that it
is assured to be globally unique. A broker MAY discard a message as a duplicate if the value
of the message-id matches that of a previously received message sent to the same node.
:vartype message_id: Optional[bytes]
:ivar user_id: The identity of the user responsible for producing the message. The client sets
this value, and it MAY be authenticated by intermediaries.
:vartype user_id: Optional[bytes]
:ivar to: The to field identifies the node that is the intended destination of the message.
On any given transfer this might not be the node at the receiving end of the link.
:vartype to: Optional[bytes]
:ivar subject: A common field for summary information about the message content and purpose.
:vartype subject: Optional[bytes]
:ivar reply_to: The address of the node to send replies to.
:vartype reply_to: Optional[bytes]
:ivar correlation_id: his is a client-specific id that can be used to mark or identify messages between clients.
:vartype correlation_id: Optional[bytes]
:ivar content_type: The RFC-2046 MIME type for the message's application-data section (body).
:vartype content_type: Optional[bytes]
:ivar content_encoding: The content-encoding property is used as a modifier to the content-type.
:vartype content_encoding: Optional[bytes]
:ivar creation_time: An absolute time when this message was created.
:vartype creation_time: Optional[int]
:ivar absolute_expiry_time: An absolute time when this message is considered to be expired.
:vartype absolute_expiry_time: Optional[int]
:ivar group_id: Identifies the group the message belongs to.
:vartype group_id: Optional[bytes]
:ivar group_sequence: The relative position of this message within its group.
:vartype group_sequence: Optional[int]
:ivar reply_to_group_id: This is a client-specific id that is used so that client can send replies
to this message to a specific group.
:vartype reply_to_group_id: Optional[bytes]
"""
def __init__(self, **kwargs):
self.message_id = kwargs.get("message_id")
self.user_id = kwargs.get("user_id")
self.to = kwargs.get("to")
self.subject = kwargs.get("subject")
self.reply_to = kwargs.get("reply_to")
self.correlation_id = kwargs.get("correlation_id")
self.content_type = kwargs.get("content_type")
self.content_encoding = kwargs.get("content_encoding")
self.creation_time = kwargs.get("creation_time")
self.absolute_expiry_time = kwargs.get("absolute_expiry_time")
self.group_id = kwargs.get("group_id")
self.group_sequence = kwargs.get("group_sequence")
self.reply_to_group_id = kwargs.get("reply_to_group_id")