Source code for azure.core.messaging

# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from __future__ import annotations
import uuid
from base64 import b64decode
from datetime import datetime
from typing import cast, Union, Any, Optional, Dict, TypeVar, Generic
from .utils._utils import _convert_to_isoformat, TZ_UTC
from .utils._messaging_shared import _get_json_content
from .serialization import NULL


__all__ = ["CloudEvent"]


_Unset: Any = object()

DataType = TypeVar("DataType")


[docs]class CloudEvent(Generic[DataType]): # pylint:disable=too-many-instance-attributes """Properties of the CloudEvent 1.0 Schema. All required parameters must be populated in order to send to Azure. :param source: Required. Identifies the context in which an event happened. The combination of id and source must be unique for each distinct event. If publishing to a domain topic, source must be the domain topic name. :type source: str :param type: Required. Type of event related to the originating occurrence. :type type: str :keyword specversion: Optional. The version of the CloudEvent spec. Defaults to "1.0" :paramtype specversion: str :keyword data: Optional. Event data specific to the event type. :paramtype data: object :keyword time: Optional. The time (in UTC) the event was generated. :paramtype time: ~datetime.datetime :keyword dataschema: Optional. Identifies the schema that data adheres to. :paramtype dataschema: str :keyword datacontenttype: Optional. Content type of data value. :paramtype datacontenttype: str :keyword subject: Optional. This describes the subject of the event in the context of the event producer (identified by source). :paramtype subject: str :keyword id: Optional. An identifier for the event. The combination of id and source must be unique for each distinct event. If not provided, a random UUID will be generated and used. :paramtype id: Optional[str] :keyword extensions: Optional. A CloudEvent MAY include any number of additional context attributes with distinct names represented as key - value pairs. Each extension must be alphanumeric, lower cased and must not exceed the length of 20 characters. :paramtype extensions: Optional[dict] """ source: str """Identifies the context in which an event happened. The combination of id and source must be unique for each distinct event. If publishing to a domain topic, source must be the domain topic name.""" type: str # pylint: disable=redefined-builtin """Type of event related to the originating occurrence.""" specversion: str = "1.0" """The version of the CloudEvent spec. Defaults to "1.0" """ id: str # pylint: disable=redefined-builtin """An identifier for the event. The combination of id and source must be unique for each distinct event. If not provided, a random UUID will be generated and used.""" data: Optional[DataType] """Event data specific to the event type.""" time: Optional[datetime] """The time (in UTC) the event was generated.""" dataschema: Optional[str] """Identifies the schema that data adheres to.""" datacontenttype: Optional[str] """Content type of data value.""" subject: Optional[str] """This describes the subject of the event in the context of the event producer (identified by source)""" extensions: Optional[Dict[str, Any]] """A CloudEvent MAY include any number of additional context attributes with distinct names represented as key - value pairs. Each extension must be alphanumeric, lower cased and must not exceed the length of 20 characters.""" def __init__( self, source: str, type: str, # pylint: disable=redefined-builtin *, specversion: Optional[str] = None, id: Optional[str] = None, # pylint: disable=redefined-builtin time: Optional[datetime] = _Unset, datacontenttype: Optional[str] = None, dataschema: Optional[str] = None, subject: Optional[str] = None, data: Optional[DataType] = None, extensions: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: self.source: str = source self.type: str = type if specversion: self.specversion: str = specversion self.id: str = id if id else str(uuid.uuid4()) self.time: Optional[datetime] if time is _Unset: self.time = datetime.now(TZ_UTC) else: self.time = time self.datacontenttype: Optional[str] = datacontenttype self.dataschema: Optional[str] = dataschema self.subject: Optional[str] = subject self.data: Optional[DataType] = data self.extensions: Optional[Dict[str, Any]] = extensions if self.extensions: for key in self.extensions.keys(): if not key.islower() or not key.isalnum(): raise ValueError("Extension attributes should be lower cased and alphanumeric.") if kwargs: remaining = ", ".join(kwargs.keys()) raise ValueError( f"Unexpected keyword arguments {remaining}. " + "Any extension attributes must be passed explicitly using extensions." ) def __repr__(self) -> str: return "CloudEvent(source={}, type={}, specversion={}, id={}, time={})".format( self.source, self.type, self.specversion, self.id, self.time )[:1024]
[docs] @classmethod def from_dict(cls, event: Dict[str, Any]) -> CloudEvent[DataType]: """Returns the deserialized CloudEvent object when a dict is provided. :param event: The dict representation of the event which needs to be deserialized. :type event: dict :rtype: CloudEvent :return: The deserialized CloudEvent object. """ kwargs: Dict[str, Any] = {} reserved_attr = [ "data", "data_base64", "id", "source", "type", "specversion", "time", "dataschema", "datacontenttype", "subject", ] if "data" in event and "data_base64" in event: raise ValueError("Invalid input. Only one of data and data_base64 must be present.") if "data" in event: data = event.get("data") kwargs["data"] = data if data is not None else NULL elif "data_base64" in event: kwargs["data"] = b64decode(cast(Union[str, bytes], event.get("data_base64"))) for item in ["datacontenttype", "dataschema", "subject"]: if item in event: val = event.get(item) kwargs[item] = val if val is not None else NULL extensions = {k: v for k, v in event.items() if k not in reserved_attr} if extensions: kwargs["extensions"] = extensions try: event_obj = cls( id=event.get("id"), source=event["source"], type=event["type"], specversion=event.get("specversion"), time=_convert_to_isoformat(event.get("time")), **kwargs, ) except KeyError as err: # https://github.com/cloudevents/spec Cloud event spec requires source, type, # specversion. We autopopulate everything other than source, type. # So we will assume the KeyError is coming from source/type access. if all( key in event for key in ( "subject", "eventType", "data", "dataVersion", "id", "eventTime", ) ): raise ValueError( "The event you are trying to parse follows the Eventgrid Schema. You can parse" + " EventGrid events using EventGridEvent.from_dict method in the azure-eventgrid library." ) from err raise ValueError( "The event does not conform to the cloud event spec https://github.com/cloudevents/spec." + " The `source` and `type` params are required." ) from err return event_obj
[docs] @classmethod def from_json(cls, event: Any) -> CloudEvent[DataType]: """Returns the deserialized CloudEvent object when a json payload is provided. :param event: The json string that should be converted into a CloudEvent. This can also be a storage QueueMessage, eventhub's EventData or ServiceBusMessage :type event: object :rtype: CloudEvent :return: The deserialized CloudEvent object. :raises ValueError: If the provided JSON is invalid. """ dict_event = _get_json_content(event) return CloudEvent.from_dict(dict_event)