# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
from __future__ import annotations
from typing import Optional, Any, cast, Mapping, Dict, Union, List
from ._amqp_utils import normalized_data_body, normalized_sequence_body
from ._constants import AmqpMessageBodyType
from .._mixin import DictMixin
[docs]
class AmqpAnnotatedMessage:
# pylint: disable=too-many-instance-attributes
"""
The AMQP Annotated Message for advanced sending and receiving scenarios which allows you to
access to low-level AMQP message sections. There should be one and only one of either data_body, sequence_body
or value_body being set as the body of the AmqpAnnotatedMessage; if more than one body is set, `ValueError` will
be raised.
Please refer to the AMQP spec:
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
for more information on the message format.
:keyword data_body: The body consists of one or more data sections and each section contains opaque binary data.
:paramtype data_body: Union[str, bytes, List[Union[str, bytes]]]
:keyword sequence_body: The body consists of one or more sequence sections and
each section contains an arbitrary number of structured data elements.
:paramtype sequence_body: List[Any]
:keyword value_body: The body consists of one amqp-value section and the section contains a single AMQP value.
:paramtype value_body: Any
:keyword header: The amqp message header.
:paramtype header: Optional[~azure.eventhub.amqp.AmqpMessageHeader]
:keyword footer: The amqp message footer.
:paramtype footer: Optional[Dict]
:keyword properties: Properties to add to the amqp message.
:paramtype properties: Optional[~azure.eventhub.amqp.AmqpMessageProperties]
:keyword application_properties: Service specific application properties.
:paramtype application_properties: Optional[Dict]
:keyword annotations: Service specific message annotations.
:paramtype annotations: Optional[Dict]
:keyword delivery_annotations: Service specific delivery annotations.
:paramtype delivery_annotations: Optional[Dict]
"""
def __init__(self, **kwargs: Any) -> None:
self._encoding = kwargs.pop("encoding", "UTF-8")
self._data_body: Optional[Union[str, bytes, List[Union[str, bytes]]]] = None
self._sequence_body: Optional[List[Any]] = None
self._value_body: Any = None
# internal usage only for Event Hub received message
message = kwargs.pop("message", None)
if message:
self._from_amqp_message(message)
return
# manually constructed AMQPAnnotatedMessage
input_count_validation = len([key for key in ("data_body", "sequence_body", "value_body") if key in kwargs])
if input_count_validation != 1:
raise ValueError(
"There should be one and only one of either data_body, sequence_body "
"or value_body being set as the body of the AmqpAnnotatedMessage."
)
self._body_type: AmqpMessageBodyType = None # type: ignore
if "data_body" in kwargs:
self._data_body = normalized_data_body(kwargs.get("data_body"))
self._body_type = AmqpMessageBodyType.DATA
elif "sequence_body" in kwargs:
self._sequence_body = normalized_sequence_body(kwargs.get("sequence_body"))
self._body_type = AmqpMessageBodyType.SEQUENCE
elif "value_body" in kwargs:
self._value_body = kwargs.get("value_body")
self._body_type = AmqpMessageBodyType.VALUE
header_dict = cast(Mapping, kwargs.get("header"))
self._header = AmqpMessageHeader(**header_dict) if "header" in kwargs else None
self._footer = kwargs.get("footer")
properties_dict = cast(Mapping, kwargs.get("properties"))
self._properties = AmqpMessageProperties(**properties_dict) if "properties" in kwargs else None
self._application_properties = kwargs.get("application_properties")
self._annotations = kwargs.get("annotations")
self._delivery_annotations = kwargs.get("delivery_annotations")
def __str__(self) -> str:
if self._body_type == AmqpMessageBodyType.DATA:
return "".join(d.decode(self._encoding) for d in self._data_body) # type: ignore
if self._body_type == AmqpMessageBodyType.SEQUENCE:
return str(self._sequence_body)
if self._body_type == AmqpMessageBodyType.VALUE:
return str(self._value_body)
return ""
def __repr__(self) -> str:
# pylint: disable=bare-except
message_repr = "body={}".format(str(self))
message_repr += ", body_type={}".format(self.body_type.value)
try:
message_repr += ", header={}".format(self.header)
except:
message_repr += ", header=<read-error>"
try:
message_repr += ", footer={}".format(self.footer)
except:
message_repr += ", footer=<read-error>"
try:
message_repr += ", properties={}".format(self.properties)
except:
message_repr += ", properties=<read-error>"
try:
message_repr += ", application_properties={}".format(self.application_properties)
except:
message_repr += ", application_properties=<read-error>"
try:
message_repr += ", delivery_annotations={}".format(self.delivery_annotations)
except:
message_repr += ", delivery_annotations=<read-error>"
try:
message_repr += ", annotations={}".format(self.annotations)
except:
message_repr += ", annotations=<read-error>"
return "AmqpAnnotatedMessage({})".format(message_repr)[:1024]
def _from_amqp_message(self, message):
self._properties = (
AmqpMessageProperties(
message_id=message.properties.message_id,
user_id=message.properties.user_id,
to=message.properties.to,
subject=message.properties.subject,
reply_to=message.properties.reply_to,
correlation_id=message.properties.correlation_id,
content_type=message.properties.content_type,
content_encoding=message.properties.content_encoding,
absolute_expiry_time=message.properties.absolute_expiry_time,
creation_time=message.properties.creation_time,
group_id=message.properties.group_id,
group_sequence=message.properties.group_sequence,
reply_to_group_id=message.properties.reply_to_group_id,
)
if message.properties
else None
)
self._header = (
AmqpMessageHeader(
delivery_count=message.header.delivery_count,
time_to_live=message.header.ttl,
first_acquirer=message.header.first_acquirer,
durable=message.header.durable,
priority=message.header.priority,
)
if message.header
else None
)
self._footer = message.footer if message.footer else {}
self._annotations = message.message_annotations if message.message_annotations else {}
self._delivery_annotations = message.delivery_annotations if message.delivery_annotations else {}
self._application_properties = message.application_properties if message.application_properties else {}
if message.data:
self._data_body = cast(List, list(message.data))
self._body_type = AmqpMessageBodyType.DATA
elif message.sequence:
self._sequence_body = cast(List, list(message.sequence))
self._body_type = AmqpMessageBodyType.SEQUENCE
else:
self._value_body = message.value
self._body_type = AmqpMessageBodyType.VALUE
@property
def body(self) -> Any:
"""
The body of the Message. The format may vary depending on the body type:
For AmqpMessageBodyType.DATA, the body could be bytes or Iterable[bytes].
For AmqpMessageBodyType.SEQUENCE, the body could be List or Iterable[List].
For AmqpMessageBodyType.VALUE, the body could be any type.
:rtype: Any
"""
if self._body_type == AmqpMessageBodyType.DATA: # pylint:disable=no-else-return
return (i for i in cast(List, self._data_body)) # type: ignore
elif self._body_type == AmqpMessageBodyType.SEQUENCE:
return (i for i in cast(List, self._sequence_body))
elif self._body_type == AmqpMessageBodyType.VALUE:
return self._value_body
return None
@property
def body_type(self) -> AmqpMessageBodyType:
"""
The body type of the underlying AMQP message.
:rtype: ~azure.eventhub.amqp.AmqpMessageBodyType
"""
return self._body_type
@property
def properties(self) -> Optional[AmqpMessageProperties]:
"""
Properties to add to the message.
:rtype: Optional[~azure.eventhub.amqp.AmqpMessageProperties]
"""
return self._properties
@properties.setter
def properties(self, value: AmqpMessageProperties) -> None:
self._properties = value
@property
def application_properties(self) -> Optional[Dict[Union[str, bytes], Any]]:
"""
Service specific application properties.
:rtype: Optional[Dict]
"""
return self._application_properties
@application_properties.setter
def application_properties(self, value: Optional[Dict[Union[str, bytes], Any]]) -> None:
self._application_properties = value
@property
def annotations(self) -> Optional[Dict[Union[str, bytes], Any]]:
"""
Service specific message annotations.
:rtype: Optional[Dict]
"""
return self._annotations
@annotations.setter
def annotations(self, value: Optional[Dict[Union[str, bytes], Any]]) -> None:
self._annotations = value
@property
def delivery_annotations(self) -> Optional[Dict[Union[str, bytes], Any]]:
"""
Delivery-specific non-standard properties at the head of the message.
Delivery annotations convey information from the sending peer to the receiving peer.
:rtype: Dict
"""
return self._delivery_annotations
@delivery_annotations.setter
def delivery_annotations(self, value: Optional[Dict[Union[str, bytes], Any]]) -> None:
self._delivery_annotations = value
@property
def header(self) -> Optional[AmqpMessageHeader]:
"""
The message header.
:rtype: Optional[~azure.eventhub.amqp.AmqpMessageHeader]
"""
return self._header
@header.setter
def header(self, value: AmqpMessageHeader) -> None:
self._header = value
@property
def footer(self) -> Optional[Dict[Any, Any]]:
"""
The message footer.
:rtype: Optional[Dict]
"""
return self._footer
@footer.setter
def footer(self, value: Optional[Dict[Any, Any]]) -> None:
self._footer = value
[docs]
class AmqpMessageProperties(DictMixin):
# pylint: disable=too-many-instance-attributes
"""Message properties.
The properties that are actually used will depend on the service implementation.
Not all received messages will have all properties, and not all properties
will be utilized on a sent message.
Please refer to the AMQP spec:
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties
for more information on the message properties.
:keyword message_id: Message-id, if set, uniquely identifies a message within the message system.
The message producer is usually responsible for setting the message-id in such a way that it
is assured to be globally unique. A broker MAY discard a message as a duplicate if the value
of the message-id matches that of a previously received message sent to the same node.
:paramtype message_id: Optional[Union[str, bytes, uuid.UUID]]
:keyword user_id: The identity of the user responsible for producing the message. The client sets
this value, and it MAY be authenticated by intermediaries.
:paramtype user_id: Optional[Union[str, bytes]]
:keyword to: The to field identifies the node that is the intended destination of the message.
On any given transfer this might not be the node at the receiving end of the link.
:paramtype to: Optional[Union[str, bytes]]
:keyword subject: A common field for summary information about the message content and purpose.
:paramtype subject: Optional[Union[str, bytes]]
:keyword reply_to: The address of the node to send replies to.
:paramtype reply_to: Optional[Union[str, bytes]]
:keyword correlation_id: This is a client-specific id that can be used to mark or identify messages between clients.
:paramtype correlation_id: Optional[Union[str, bytes]]
:keyword content_type: The RFC-2046 MIME type for the message's application-data section (body).
:paramtype content_type: Optional[Union[str, bytes]]
:keyword content_encoding: The content-encoding property is used as a modifier to the content-type.
:paramtype content_encoding: Optional[Union[str, bytes]]
:keyword creation_time: An absolute time when this message was created.
:paramtype creation_time: Optional[int]
:keyword absolute_expiry_time: An absolute time when this message is considered to be expired.
:paramtype absolute_expiry_time: Optional[int]
:keyword group_id: Identifies the group the message belongs to.
:paramtype group_id: Optional[Union[str, bytes]]
:keyword group_sequence: The relative position of this message within its group.
:paramtype group_sequence: Optional[int]
:keyword reply_to_group_id: This is a client-specific id that is used so that client can send replies
to this message to a specific group.
:paramtype reply_to_group_id: Optional[Union[str, bytes]]
:ivar message_id: Message-id, if set, uniquely identifies a message within the message system.
The message producer is usually responsible for setting the message-id in such a way that it
is assured to be globally unique. A broker MAY discard a message as a duplicate if the value
of the message-id matches that of a previously received message sent to the same node.
:vartype message_id: Optional[bytes]
:ivar user_id: The identity of the user responsible for producing the message. The client sets
this value, and it MAY be authenticated by intermediaries.
:vartype user_id: Optional[bytes]
:ivar to: The to field identifies the node that is the intended destination of the message.
On any given transfer this might not be the node at the receiving end of the link.
:vartype to: Optional[bytes]
:ivar subject: A common field for summary information about the message content and purpose.
:vartype subject: Optional[bytes]
:ivar reply_to: The address of the node to send replies to.
:vartype reply_to: Optional[bytes]
:ivar correlation_id: his is a client-specific id that can be used to mark or identify messages between clients.
:vartype correlation_id: Optional[bytes]
:ivar content_type: The RFC-2046 MIME type for the message's application-data section (body).
:vartype content_type: Optional[bytes]
:ivar content_encoding: The content-encoding property is used as a modifier to the content-type.
:vartype content_encoding: Optional[bytes]
:ivar creation_time: An absolute time when this message was created.
:vartype creation_time: Optional[int]
:ivar absolute_expiry_time: An absolute time when this message is considered to be expired.
:vartype absolute_expiry_time: Optional[int]
:ivar group_id: Identifies the group the message belongs to.
:vartype group_id: Optional[bytes]
:ivar group_sequence: The relative position of this message within its group.
:vartype group_sequence: Optional[int]
:ivar reply_to_group_id: This is a client-specific id that is used so that client can send replies
to this message to a specific group.
:vartype reply_to_group_id: Optional[bytes]
"""
def __init__(self, **kwargs):
self.message_id = kwargs.get("message_id")
self.user_id = kwargs.get("user_id")
self.to = kwargs.get("to")
self.subject = kwargs.get("subject")
self.reply_to = kwargs.get("reply_to")
self.correlation_id = kwargs.get("correlation_id")
self.content_type = kwargs.get("content_type")
self.content_encoding = kwargs.get("content_encoding")
self.creation_time = kwargs.get("creation_time")
self.absolute_expiry_time = kwargs.get("absolute_expiry_time")
self.group_id = kwargs.get("group_id")
self.group_sequence = kwargs.get("group_sequence")
self.reply_to_group_id = kwargs.get("reply_to_group_id")