Source code for azure.eventhub.common

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

import datetime
import calendar
import json
import logging
import six

from uamqp import BatchMessage, Message, types, constants  # type: ignore
from uamqp.message import MessageHeader  # type: ignore

from azure.core.settings import settings # type: ignore

from .error import EventDataError

log = logging.getLogger(__name__)

# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each
_BATCH_MESSAGE_OVERHEAD_COST = [5, 8]


def parse_sas_token(sas_token):
    """Parse a SAS token into its components.

    :param sas_token: The SAS token.
    :type sas_token: str
    :rtype: dict[str, str]
    """
    sas_data = {}
    token = sas_token.partition(' ')[2]
    fields = token.split('&')
    for field in fields:
        key, value = field.split('=', 1)
        sas_data[key.lower()] = value
    return sas_data


[docs]class EventData(object): """ The EventData class is a holder of event content. :param body: The data to send in a single message. body can be type of str or bytes. :type body: str or bytes .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START create_event_data] :end-before: [END create_event_data] :language: python :dedent: 4 :caption: Create instances of EventData """ _PROP_SEQ_NUMBER = b"x-opt-sequence-number" _PROP_OFFSET = b"x-opt-offset" _PROP_PARTITION_KEY = b"x-opt-partition-key" _PROP_PARTITION_KEY_AMQP_SYMBOL = types.AMQPSymbol(_PROP_PARTITION_KEY) _PROP_TIMESTAMP = b"x-opt-enqueued-time" _PROP_LAST_ENQUEUED_SEQUENCE_NUMBER = b"last_enqueued_sequence_number" _PROP_LAST_ENQUEUED_OFFSET = b"last_enqueued_offset" _PROP_LAST_ENQUEUED_TIME_UTC = b"last_enqueued_time_utc" _PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC = b"runtime_info_retrieval_time_utc" def __init__(self, body=None): self._last_enqueued_event_properties = {} if body and isinstance(body, list): self.message = Message(body[0]) for more in body[1:]: self.message._body.append(more) # pylint: disable=protected-access elif body is None: raise ValueError("EventData cannot be None.") else: self.message = Message(body) self.message.annotations = {} self.message.application_properties = {} def __str__(self): dic = { 'body': self.body_as_str(), 'application_properties': str(self.application_properties) } if self.sequence_number: dic['sequence_number'] = str(self.sequence_number) if self.offset: dic['offset'] = str(self.offset) if self.enqueued_time: dic['enqueued_time'] = str(self.enqueued_time) if self.partition_key: dic['partition_key'] = str(self.partition_key) return str(dic) def _set_partition_key(self, value): """ Set the partition key of the event data object. :param value: The partition key to set. :type value: str or bytes """ annotations = dict(self.message.annotations) annotations[EventData._PROP_PARTITION_KEY_AMQP_SYMBOL] = value header = MessageHeader() header.durable = True self.message.annotations = annotations self.message.header = header def _trace_message(self, parent_span=None): """Add tracing information to this message. Will open and close a "Azure.EventHubs.message" span, and add the "DiagnosticId" as app properties of the message. """ span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] if span_impl_type is not None: current_span = parent_span or span_impl_type(span_impl_type.get_current_span()) message_span = current_span.span(name="Azure.EventHubs.message") message_span.start() app_prop = dict(self.application_properties) if self.application_properties else dict() app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) self.application_properties = app_prop message_span.finish() def _trace_link_message(self, parent_span=None): """Link the current message to current span. Will extract DiagnosticId if available. """ span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] if span_impl_type is not None: current_span = parent_span or span_impl_type(span_impl_type.get_current_span()) if current_span and self.application_properties: traceparent = self.application_properties.get(b"Diagnostic-Id", "").decode('ascii') if traceparent: current_span.link(traceparent) def _get_last_enqueued_event_properties(self): if self._last_enqueued_event_properties: return self._last_enqueued_event_properties if self.message.delivery_annotations: enqueued_time_stamp = \ self.message.delivery_annotations.get(EventData._PROP_LAST_ENQUEUED_TIME_UTC, None) retrieval_time_stamp = \ self.message.delivery_annotations.get(EventData._PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None) self._last_enqueued_event_properties = { "sequence_number": self.message.delivery_annotations.get(EventData._PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None), "offset": self.message.delivery_annotations.get(EventData._PROP_LAST_ENQUEUED_OFFSET, None), "enqueued_time": datetime.datetime.utcfromtimestamp( float(enqueued_time_stamp)/1000) if enqueued_time_stamp else None, "retrieval_time": datetime.datetime.utcfromtimestamp( float(retrieval_time_stamp)/1000) if retrieval_time_stamp else None } return self._last_enqueued_event_properties return None @classmethod def _from_message(cls, message): # pylint:disable=protected-access event_data = cls(body='') event_data.message = message return event_data @property def sequence_number(self): """ The sequence number of the event data object. :rtype: int or long """ return self.message.annotations.get(EventData._PROP_SEQ_NUMBER, None) @property def offset(self): """ The offset of the event data object. :rtype: str """ try: return self.message.annotations[EventData._PROP_OFFSET].decode('UTF-8') except (KeyError, AttributeError): return None @property def enqueued_time(self): """ The enqueued timestamp of the event data object. :rtype: datetime.datetime """ timestamp = self.message.annotations.get(EventData._PROP_TIMESTAMP, None) if timestamp: return datetime.datetime.utcfromtimestamp(float(timestamp)/1000) return None @property def partition_key(self): """ The partition key of the event data object. :rtype: bytes """ try: return self.message.annotations[EventData._PROP_PARTITION_KEY_AMQP_SYMBOL] except KeyError: return self.message.annotations.get(EventData._PROP_PARTITION_KEY, None) @property def application_properties(self): """ Application defined properties on the message. :rtype: dict """ return self.message.application_properties @application_properties.setter def application_properties(self, value): """ Application defined properties on the message. :param dict value: The application properties for the EventData. """ properties = None if value is None else dict(value) self.message.application_properties = properties @property def system_properties(self): """ Metadata set by the Event Hubs Service associated with the EventData :rtype: dict """ return self.message.annotations @property def body(self): """ The body of the event data object. :rtype: bytes or Generator[bytes] """ try: return self.message.get_data() except TypeError: raise ValueError("Message data empty.") @property def last_enqueued_event_properties(self): """ The latest enqueued event information. This property will be updated each time an event is received when the receiver is created with `track_last_enqueued_event_properties` being `True`. The dict includes following information of the partition: - `sequence_number` - `offset` - `enqueued_time` - `retrieval_time` :rtype: dict or None """ return self._get_last_enqueued_event_properties()
[docs] def body_as_str(self, encoding='UTF-8'): """ The body of the event data as a string if the data is of a compatible type. :param encoding: The encoding to use for decoding message data. Default is 'UTF-8' :rtype: str """ data = self.body try: return "".join(b.decode(encoding) for b in data) except TypeError: return six.text_type(data) except: # pylint: disable=bare-except pass try: return data.decode(encoding) except Exception as e: raise TypeError("Message data is not compatible with string type: {}".format(e))
[docs] def body_as_json(self, encoding='UTF-8'): """ The body of the event loaded as a JSON object is the data is compatible. :param encoding: The encoding to use for decoding message data. Default is 'UTF-8' :rtype: dict """ data_str = self.body_as_str(encoding=encoding) try: return json.loads(data_str) except Exception as e: raise TypeError("Event data is not compatible with JSON type: {}".format(e))
[docs] def encode_message(self): return self.message.encode_message()
[docs]class EventDataBatch(object): """ Sending events in batch get better performance than sending individual events. EventDataBatch helps you create the maximum allowed size batch of `EventData` to improve sending performance. Use `try_add` method to add events until the maximum batch size limit in bytes has been reached - a `ValueError` will be raised. Use `send` method of :class:`EventHubProducerClient<azure.eventhub.EventHubProducerClient>` or the async :class:`EventHubProducerClient<azure.eventhub.aio.EventHubProducerClient>` for sending. The `send` method accepts partition_key as a parameter for sending a particular partition. **Please use the create_batch method of EventHubProducerClient to create an EventDataBatch object instead of instantiating an EventDataBatch object directly.** :param int max_size: The maximum size of bytes data that an EventDataBatch object can hold. :param str partition_key: With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. """ def __init__(self, max_size=None, partition_key=None): self.max_size = max_size or constants.MAX_MESSAGE_LENGTH_BYTES self._partition_key = partition_key self.message = BatchMessage(data=[], multi_messages=False, properties=None) self._set_partition_key(partition_key) self._size = self.message.gather()[0].get_message_encoded_size() self._count = 0 def __len__(self): return self._count @staticmethod def _from_batch(batch_data, partition_key=None): batch_data_instance = EventDataBatch(partition_key=partition_key) batch_data_instance.message._body_gen = batch_data # pylint:disable=protected-access return batch_data_instance def _set_partition_key(self, value): if value: annotations = self.message.annotations if annotations is None: annotations = dict() annotations[types.AMQPSymbol(EventData._PROP_PARTITION_KEY)] = value # pylint:disable=protected-access header = MessageHeader() header.durable = True self.message.annotations = annotations self.message.header = header @property def size(self): """The size of EventDataBatch object in bytes :rtype: int """ return self._size
[docs] def try_add(self, event_data): """ Try to add an EventData object, the size of EventData is a sum up of body, application_properties, etc. :param event_data: The EventData object which is attempted to be added. :type event_data: ~azure.eventhub.EventData :rtype: None :raise: :class:`ValueError`, when exceeding the size limit. """ if event_data is None: log.warning("event_data is None when calling EventDataBatch.try_add. Ignored") return if not isinstance(event_data, EventData): raise TypeError('event_data should be type of EventData') if self._partition_key: if event_data.partition_key and event_data.partition_key != self._partition_key: raise EventDataError('The partition_key of event_data does not match the one of the EventDataBatch') if not event_data.partition_key: event_data._set_partition_key(self._partition_key) # pylint:disable=protected-access event_data._trace_message() # pylint:disable=protected-access event_data_size = event_data.message.get_message_encoded_size() # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that # message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes. size_after_add = self._size + event_data_size\ + _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1] if size_after_add > self.max_size: raise ValueError("EventDataBatch has reached its size limit {}".format(self.max_size)) self.message._body_gen.append(event_data) # pylint: disable=protected-access self._size = size_after_add self._count += 1
[docs]class EventPosition(object): """ The position(offset, sequence or timestamp) where a consumer starts. :param value: The event position value. The value can be type of datetime.datetime or int or str. :type value: int, str or datetime.datetime :param bool inclusive: Whether to include the supplied value as the start point. Examples: Beginning of the event stream: >>> event_pos = EventPosition("-1") End of the event stream: >>> event_pos = EventPosition("@latest") Events after the specified offset: >>> event_pos = EventPosition("12345") Events from the specified offset: >>> event_pos = EventPosition("12345", True) Events after a datetime: >>> event_pos = EventPosition(datetime.datetime.utcnow()) Events after a specific sequence number: >>> event_pos = EventPosition(1506968696002) """ def __init__(self, value, inclusive=False): self.value = value if value is not None else "-1" self.inclusive = inclusive def __str__(self): return str(self.value) def _selector(self): """ Creates a selector expression of the offset. :rtype: bytes """ operator = ">=" if self.inclusive else ">" if isinstance(self.value, datetime.datetime): # pylint:disable=no-else-return timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000) return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8') elif isinstance(self.value, six.integer_types): return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8') return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')
# TODO: move some behaviors to these two classes.
[docs]class EventHubSASTokenCredential(object): """ SAS token used for authentication. :param token: A SAS token or function that returns a SAS token. If a function is supplied, it will be used to retrieve subsequent tokens in the case of token expiry. The function should take no arguments. The token can be type of str or Callable object. """ def __init__(self, token): self.token = token
[docs] def get_sas_token(self): if callable(self.token): # pylint:disable=no-else-return return self.token() else: return self.token
[docs]class EventHubSharedKeyCredential(object): """ The shared access key credential used for authentication. :param str policy: The name of the shared access policy. :param str key: The shared access key. """ def __init__(self, policy, key): self.policy = policy self.key = key
class _Address(object): def __init__(self, hostname=None, path=None): self.hostname = hostname self.path = path