# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals
import datetime
import calendar
import json
import logging
import six
from uamqp import BatchMessage, Message, types, constants # type: ignore
from uamqp.message import MessageHeader # type: ignore
from azure.core.settings import settings # type: ignore
from .error import EventDataError
log = logging.getLogger(__name__)
# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each
_BATCH_MESSAGE_OVERHEAD_COST = [5, 8]
def parse_sas_token(sas_token):
"""Parse a SAS token into its components.
:param sas_token: The SAS token.
:type sas_token: str
:rtype: dict[str, str]
"""
sas_data = {}
token = sas_token.partition(' ')[2]
fields = token.split('&')
for field in fields:
key, value = field.split('=', 1)
sas_data[key.lower()] = value
return sas_data
[docs]class EventData(object):
"""
The EventData class is a holder of event content.
:param body: The data to send in a single message. body can be type of str or bytes.
:type body: str or bytes
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START create_event_data]
:end-before: [END create_event_data]
:language: python
:dedent: 4
:caption: Create instances of EventData
"""
_PROP_SEQ_NUMBER = b"x-opt-sequence-number"
_PROP_OFFSET = b"x-opt-offset"
_PROP_PARTITION_KEY = b"x-opt-partition-key"
_PROP_PARTITION_KEY_AMQP_SYMBOL = types.AMQPSymbol(_PROP_PARTITION_KEY)
_PROP_TIMESTAMP = b"x-opt-enqueued-time"
_PROP_LAST_ENQUEUED_SEQUENCE_NUMBER = b"last_enqueued_sequence_number"
_PROP_LAST_ENQUEUED_OFFSET = b"last_enqueued_offset"
_PROP_LAST_ENQUEUED_TIME_UTC = b"last_enqueued_time_utc"
_PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC = b"runtime_info_retrieval_time_utc"
def __init__(self, body=None):
self._last_enqueued_event_properties = {}
if body and isinstance(body, list):
self.message = Message(body[0])
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
elif body is None:
raise ValueError("EventData cannot be None.")
else:
self.message = Message(body)
self.message.annotations = {}
self.message.application_properties = {}
def __str__(self):
dic = {
'body': self.body_as_str(),
'application_properties': str(self.application_properties)
}
if self.sequence_number:
dic['sequence_number'] = str(self.sequence_number)
if self.offset:
dic['offset'] = str(self.offset)
if self.enqueued_time:
dic['enqueued_time'] = str(self.enqueued_time)
if self.partition_key:
dic['partition_key'] = str(self.partition_key)
return str(dic)
def _set_partition_key(self, value):
"""
Set the partition key of the event data object.
:param value: The partition key to set.
:type value: str or bytes
"""
annotations = dict(self.message.annotations)
annotations[EventData._PROP_PARTITION_KEY_AMQP_SYMBOL] = value
header = MessageHeader()
header.durable = True
self.message.annotations = annotations
self.message.header = header
def _trace_message(self, parent_span=None):
"""Add tracing information to this message.
Will open and close a "Azure.EventHubs.message" span, and
add the "DiagnosticId" as app properties of the message.
"""
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is not None:
current_span = parent_span or span_impl_type(span_impl_type.get_current_span())
message_span = current_span.span(name="Azure.EventHubs.message")
message_span.start()
app_prop = dict(self.application_properties) if self.application_properties else dict()
app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii'))
self.application_properties = app_prop
message_span.finish()
def _trace_link_message(self, parent_span=None):
"""Link the current message to current span.
Will extract DiagnosticId if available.
"""
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is not None:
current_span = parent_span or span_impl_type(span_impl_type.get_current_span())
if current_span and self.application_properties:
traceparent = self.application_properties.get(b"Diagnostic-Id", "").decode('ascii')
if traceparent:
current_span.link(traceparent)
def _get_last_enqueued_event_properties(self):
if self._last_enqueued_event_properties:
return self._last_enqueued_event_properties
if self.message.delivery_annotations:
enqueued_time_stamp = \
self.message.delivery_annotations.get(EventData._PROP_LAST_ENQUEUED_TIME_UTC, None)
retrieval_time_stamp = \
self.message.delivery_annotations.get(EventData._PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None)
self._last_enqueued_event_properties = {
"sequence_number":
self.message.delivery_annotations.get(EventData._PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None),
"offset":
self.message.delivery_annotations.get(EventData._PROP_LAST_ENQUEUED_OFFSET, None),
"enqueued_time":
datetime.datetime.utcfromtimestamp(
float(enqueued_time_stamp)/1000) if enqueued_time_stamp else None,
"retrieval_time":
datetime.datetime.utcfromtimestamp(
float(retrieval_time_stamp)/1000) if retrieval_time_stamp else None
}
return self._last_enqueued_event_properties
return None
@classmethod
def _from_message(cls, message):
# pylint:disable=protected-access
event_data = cls(body='')
event_data.message = message
return event_data
@property
def sequence_number(self):
"""
The sequence number of the event data object.
:rtype: int or long
"""
return self.message.annotations.get(EventData._PROP_SEQ_NUMBER, None)
@property
def offset(self):
"""
The offset of the event data object.
:rtype: str
"""
try:
return self.message.annotations[EventData._PROP_OFFSET].decode('UTF-8')
except (KeyError, AttributeError):
return None
@property
def enqueued_time(self):
"""
The enqueued timestamp of the event data object.
:rtype: datetime.datetime
"""
timestamp = self.message.annotations.get(EventData._PROP_TIMESTAMP, None)
if timestamp:
return datetime.datetime.utcfromtimestamp(float(timestamp)/1000)
return None
@property
def partition_key(self):
"""
The partition key of the event data object.
:rtype: bytes
"""
try:
return self.message.annotations[EventData._PROP_PARTITION_KEY_AMQP_SYMBOL]
except KeyError:
return self.message.annotations.get(EventData._PROP_PARTITION_KEY, None)
@property
def application_properties(self):
"""
Application defined properties on the message.
:rtype: dict
"""
return self.message.application_properties
@application_properties.setter
def application_properties(self, value):
"""
Application defined properties on the message.
:param dict value: The application properties for the EventData.
"""
properties = None if value is None else dict(value)
self.message.application_properties = properties
@property
def system_properties(self):
"""
Metadata set by the Event Hubs Service associated with the EventData
:rtype: dict
"""
return self.message.annotations
@property
def body(self):
"""
The body of the event data object.
:rtype: bytes or Generator[bytes]
"""
try:
return self.message.get_data()
except TypeError:
raise ValueError("Message data empty.")
@property
def last_enqueued_event_properties(self):
"""
The latest enqueued event information. This property will be updated each time an event is received when
the receiver is created with `track_last_enqueued_event_properties` being `True`.
The dict includes following information of the partition:
- `sequence_number`
- `offset`
- `enqueued_time`
- `retrieval_time`
:rtype: dict or None
"""
return self._get_last_enqueued_event_properties()
[docs] def body_as_str(self, encoding='UTF-8'):
"""
The body of the event data as a string if the data is of a
compatible type.
:param encoding: The encoding to use for decoding message data.
Default is 'UTF-8'
:rtype: str
"""
data = self.body
try:
return "".join(b.decode(encoding) for b in data)
except TypeError:
return six.text_type(data)
except: # pylint: disable=bare-except
pass
try:
return data.decode(encoding)
except Exception as e:
raise TypeError("Message data is not compatible with string type: {}".format(e))
[docs] def body_as_json(self, encoding='UTF-8'):
"""
The body of the event loaded as a JSON object is the data is compatible.
:param encoding: The encoding to use for decoding message data.
Default is 'UTF-8'
:rtype: dict
"""
data_str = self.body_as_str(encoding=encoding)
try:
return json.loads(data_str)
except Exception as e:
raise TypeError("Event data is not compatible with JSON type: {}".format(e))
[docs] def encode_message(self):
return self.message.encode_message()
[docs]class EventDataBatch(object):
"""
Sending events in batch get better performance than sending individual events.
EventDataBatch helps you create the maximum allowed size batch of `EventData` to improve sending performance.
Use `try_add` method to add events until the maximum batch size limit in bytes has been reached -
a `ValueError` will be raised.
Use `send` method of :class:`EventHubProducerClient<azure.eventhub.EventHubProducerClient>`
or the async :class:`EventHubProducerClient<azure.eventhub.aio.EventHubProducerClient>`
for sending. The `send` method accepts partition_key as a parameter for sending a particular partition.
**Please use the create_batch method of EventHubProducerClient
to create an EventDataBatch object instead of instantiating an EventDataBatch object directly.**
:param int max_size: The maximum size of bytes data that an EventDataBatch object can hold.
:param str partition_key: With the given partition_key, event data will land to a particular partition of the
Event Hub decided by the service.
"""
def __init__(self, max_size=None, partition_key=None):
self.max_size = max_size or constants.MAX_MESSAGE_LENGTH_BYTES
self._partition_key = partition_key
self.message = BatchMessage(data=[], multi_messages=False, properties=None)
self._set_partition_key(partition_key)
self._size = self.message.gather()[0].get_message_encoded_size()
self._count = 0
def __len__(self):
return self._count
@staticmethod
def _from_batch(batch_data, partition_key=None):
batch_data_instance = EventDataBatch(partition_key=partition_key)
batch_data_instance.message._body_gen = batch_data # pylint:disable=protected-access
return batch_data_instance
def _set_partition_key(self, value):
if value:
annotations = self.message.annotations
if annotations is None:
annotations = dict()
annotations[types.AMQPSymbol(EventData._PROP_PARTITION_KEY)] = value # pylint:disable=protected-access
header = MessageHeader()
header.durable = True
self.message.annotations = annotations
self.message.header = header
@property
def size(self):
"""The size of EventDataBatch object in bytes
:rtype: int
"""
return self._size
[docs] def try_add(self, event_data):
"""
Try to add an EventData object, the size of EventData is a sum up of body, application_properties, etc.
:param event_data: The EventData object which is attempted to be added.
:type event_data: ~azure.eventhub.EventData
:rtype: None
:raise: :class:`ValueError`, when exceeding the size limit.
"""
if event_data is None:
log.warning("event_data is None when calling EventDataBatch.try_add. Ignored")
return
if not isinstance(event_data, EventData):
raise TypeError('event_data should be type of EventData')
if self._partition_key:
if event_data.partition_key and event_data.partition_key != self._partition_key:
raise EventDataError('The partition_key of event_data does not match the one of the EventDataBatch')
if not event_data.partition_key:
event_data._set_partition_key(self._partition_key) # pylint:disable=protected-access
event_data._trace_message() # pylint:disable=protected-access
event_data_size = event_data.message.get_message_encoded_size()
# For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that
# message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes.
size_after_add = self._size + event_data_size\
+ _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1]
if size_after_add > self.max_size:
raise ValueError("EventDataBatch has reached its size limit {}".format(self.max_size))
self.message._body_gen.append(event_data) # pylint: disable=protected-access
self._size = size_after_add
self._count += 1
[docs]class EventPosition(object):
"""
The position(offset, sequence or timestamp) where a consumer starts.
:param value: The event position value. The value can be type of datetime.datetime or int or str.
:type value: int, str or datetime.datetime
:param bool inclusive: Whether to include the supplied value as the start point.
Examples:
Beginning of the event stream:
>>> event_pos = EventPosition("-1")
End of the event stream:
>>> event_pos = EventPosition("@latest")
Events after the specified offset:
>>> event_pos = EventPosition("12345")
Events from the specified offset:
>>> event_pos = EventPosition("12345", True)
Events after a datetime:
>>> event_pos = EventPosition(datetime.datetime.utcnow())
Events after a specific sequence number:
>>> event_pos = EventPosition(1506968696002)
"""
def __init__(self, value, inclusive=False):
self.value = value if value is not None else "-1"
self.inclusive = inclusive
def __str__(self):
return str(self.value)
def _selector(self):
"""
Creates a selector expression of the offset.
:rtype: bytes
"""
operator = ">=" if self.inclusive else ">"
if isinstance(self.value, datetime.datetime): # pylint:disable=no-else-return
timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000)
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
elif isinstance(self.value, six.integer_types):
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')
# TODO: move some behaviors to these two classes.
[docs]class EventHubSASTokenCredential(object):
"""
SAS token used for authentication.
:param token: A SAS token or function that returns a SAS token. If a function is supplied,
it will be used to retrieve subsequent tokens in the case of token expiry. The function should
take no arguments. The token can be type of str or Callable object.
"""
def __init__(self, token):
self.token = token
[docs] def get_sas_token(self):
if callable(self.token): # pylint:disable=no-else-return
return self.token()
else:
return self.token
[docs]class EventHubSharedKeyCredential(object):
"""
The shared access key credential used for authentication.
:param str policy: The name of the shared access policy.
:param str key: The shared access key.
"""
def __init__(self, policy, key):
self.policy = policy
self.key = key
class _Address(object):
def __init__(self, hostname=None, path=None):
self.hostname = hostname
self.path = path