Source code for azure.core.messaging

# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import uuid
from base64 import b64decode
from datetime import datetime
from .utils._utils import _convert_to_isoformat, TZ_UTC
from .utils._messaging_shared import _get_json_content
from .serialization import NULL

try:
    from typing import TYPE_CHECKING, cast, Union
except ImportError:
    TYPE_CHECKING = False

if TYPE_CHECKING:
    from typing import Any, Optional, Dict


__all__ = ["CloudEvent"]


[docs]class CloudEvent(object): # pylint:disable=too-many-instance-attributes """Properties of the CloudEvent 1.0 Schema. All required parameters must be populated in order to send to Azure. :param source: Required. Identifies the context in which an event happened. The combination of id and source must be unique for each distinct event. If publishing to a domain topic, source must be the domain name. :type source: str :param type: Required. Type of event related to the originating occurrence. :type type: str :keyword data: Optional. Event data specific to the event type. :type data: object :keyword time: Optional. The time (in UTC) the event was generated. :type time: ~datetime.datetime :keyword dataschema: Optional. Identifies the schema that data adheres to. :type dataschema: str :keyword datacontenttype: Optional. Content type of data value. :type datacontenttype: str :keyword subject: Optional. This describes the subject of the event in the context of the event producer (identified by source). :type subject: str :keyword specversion: Optional. The version of the CloudEvent spec. Defaults to "1.0" :type specversion: str :keyword id: Optional. An identifier for the event. The combination of id and source must be unique for each distinct event. If not provided, a random UUID will be generated and used. :type id: Optional[str] :keyword extensions: Optional. A CloudEvent MAY include any number of additional context attributes with distinct names represented as key - value pairs. Each extension must be alphanumeric, lower cased and must not exceed the length of 20 characters. :type extensions: Optional[Dict] :ivar source: Identifies the context in which an event happened. The combination of id and source must be unique for each distinct event. If publishing to a domain topic, source must be the domain name. :vartype source: str :ivar data: Event data specific to the event type. :vartype data: object :ivar type: Type of event related to the originating occurrence. :vartype type: str :ivar time: The time (in UTC) the event was generated. :vartype time: ~datetime.datetime :ivar dataschema: Identifies the schema that data adheres to. :vartype dataschema: str :ivar datacontenttype: Content type of data value. :vartype datacontenttype: str :ivar subject: This describes the subject of the event in the context of the event producer (identified by source). :vartype subject: str :ivar specversion: Optional. The version of the CloudEvent spec. Defaults to "1.0" :vartype specversion: str :ivar id: An identifier for the event. The combination of id and source must be unique for each distinct event. If not provided, a random UUID will be generated and used. :vartype id: str :ivar extensions: A CloudEvent MAY include any number of additional context attributes with distinct names represented as key - value pairs. Each extension must be alphanumeric, lower cased and must not exceed the length of 20 characters. :vartype extensions: Dict """ def __init__(self, source, type, **kwargs): # pylint: disable=redefined-builtin # type: (str, str, **Any) -> None self.source = source # type: str self.type = type # type: str self.specversion = kwargs.pop("specversion", "1.0") # type: Optional[str] self.id = kwargs.pop("id", str(uuid.uuid4())) # type: Optional[str] self.time = kwargs.pop("time", datetime.now(TZ_UTC)) # type: Optional[datetime] self.datacontenttype = kwargs.pop("datacontenttype", None) # type: Optional[str] self.dataschema = kwargs.pop("dataschema", None) # type: Optional[str] self.subject = kwargs.pop("subject", None) # type: Optional[str] self.data = kwargs.pop("data", None) # type: Optional[object] try: self.extensions = kwargs.pop("extensions") # type: Optional[Dict] for key in self.extensions.keys(): # type:ignore # extensions won't be None here if not key.islower() or not key.isalnum(): raise ValueError( "Extension attributes should be lower cased and alphanumeric." ) except KeyError: self.extensions = None if kwargs: remaining = ", ".join(kwargs.keys()) raise ValueError( "Unexpected keyword arguments {}. Any extension attributes must be passed explicitly using extensions." .format(remaining) ) def __repr__(self): return "CloudEvent(source={}, type={}, specversion={}, id={}, time={})".format( self.source, self.type, self.specversion, self.id, self.time )[:1024]
[docs] @classmethod def from_dict(cls, event): # type: (Dict) -> CloudEvent """ Returns the deserialized CloudEvent object when a dict is provided. :param event: The dict representation of the event which needs to be deserialized. :type event: dict :rtype: CloudEvent """ kwargs = {} # type: Dict[Any, Any] reserved_attr = [ "data", "data_base64", "id", "source", "type", "specversion", "time", "dataschema", "datacontenttype", "subject", ] if "data" in event and "data_base64" in event: raise ValueError( "Invalid input. Only one of data and data_base64 must be present." ) if "data" in event: data = event.get("data") kwargs["data"] = data if data is not None else NULL elif "data_base64" in event: kwargs["data"] = b64decode( cast(Union[str, bytes], event.get("data_base64")) ) for item in ["datacontenttype", "dataschema", "subject"]: if item in event: val = event.get(item) kwargs[item] = val if val is not None else NULL extensions = {k: v for k, v in event.items() if k not in reserved_attr} if extensions: kwargs["extensions"] = extensions try: event_obj = cls( id=event.get("id"), source=event["source"], type=event["type"], specversion=event.get("specversion"), time=_convert_to_isoformat(event.get("time")), **kwargs ) except KeyError: # https://github.com/cloudevents/spec Cloud event spec requires source, type, # specversion. We autopopulate everything other than source, type. if not all([_ in event for _ in ("source", "type")]): if all([_ in event for _ in ("subject", "eventType", "data", "dataVersion", "id", "eventTime")]): raise ValueError( "The event you are trying to parse follows the Eventgrid Schema. You can parse" + " EventGrid events using EventGridEvent.from_dict method in the azure-eventgrid library." ) raise ValueError( "The event does not conform to the cloud event spec https://github.com/cloudevents/spec." + " The `source` and `type` params are required." ) return event_obj
[docs] @classmethod def from_json(cls, event): # type: (Any) -> CloudEvent """ Returns the deserialized CloudEvent object when a json payload is provided. :param event: The json string that should be converted into a CloudEvent. This can also be a storage QueueMessage, eventhub's EventData or ServiceBusMessage :type event: object :rtype: CloudEvent :raises ValueError: If the provided JSON is invalid. """ dict_event = _get_json_content(event) return CloudEvent.from_dict(dict_event)