# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from typing import Any, Union, List, Dict
from azure.core.credentials import AzureKeyCredential
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.pipeline.policies import (
RequestIdPolicy,
HeadersPolicy,
AsyncRedirectPolicy,
AsyncRetryPolicy,
ContentDecodePolicy,
CustomHookPolicy,
NetworkTraceLoggingPolicy,
ProxyPolicy,
DistributedTracingPolicy,
HttpLoggingPolicy,
UserAgentPolicy
)
from .._policies import CloudEventDistributedTracingPolicy
from .._models import CloudEvent, EventGridEvent, CustomEvent
from .._helpers import _get_topic_hostname_only_fqdn, _get_authentication_policy, _is_cloud_event
from .._generated.aio import EventGridPublisherClient as EventGridPublisherClientAsync
from .._shared_access_signature_credential import EventGridSharedAccessSignatureCredential
from .._version import VERSION
SendType = Union[
CloudEvent,
EventGridEvent,
CustomEvent,
Dict,
List[CloudEvent],
List[EventGridEvent],
List[CustomEvent],
List[Dict]
]
[docs]class EventGridPublisherClient():
"""Asynchronous EventGrid Python Publisher Client.
:param str topic_hostname: The topic endpoint to send the events to.
:param credential: The credential object used for authentication which implements
SAS key authentication or SAS token authentication.
:type credential: ~azure.core.credentials.AzureKeyCredential or EventGridSharedAccessSignatureCredential
"""
def __init__(
self,
topic_hostname: str,
credential: Union[AzureKeyCredential, EventGridSharedAccessSignatureCredential],
**kwargs: Any) -> None:
self._client = EventGridPublisherClientAsync(
policies=EventGridPublisherClient._policies(credential, **kwargs),
**kwargs
)
topic_hostname = _get_topic_hostname_only_fqdn(topic_hostname)
self._topic_hostname = topic_hostname
@staticmethod
def _policies(
credential: Union[AzureKeyCredential, EventGridSharedAccessSignatureCredential],
**kwargs: Any
) -> List[Any]:
auth_policy = _get_authentication_policy(credential)
sdk_moniker = 'eventgridpublisherclient/{}'.format(VERSION)
policies = [
RequestIdPolicy(**kwargs),
HeadersPolicy(**kwargs),
UserAgentPolicy(sdk_moniker=sdk_moniker, **kwargs),
ProxyPolicy(**kwargs),
ContentDecodePolicy(**kwargs),
AsyncRedirectPolicy(**kwargs),
AsyncRetryPolicy(**kwargs),
auth_policy,
CustomHookPolicy(**kwargs),
NetworkTraceLoggingPolicy(**kwargs),
DistributedTracingPolicy(**kwargs),
CloudEventDistributedTracingPolicy(),
HttpLoggingPolicy(**kwargs)
]
return policies
[docs] @distributed_trace_async
async def send(
self,
events: SendType,
**kwargs: Any) -> None:
"""Sends event data to topic hostname specified during client initialization.
:param events: A list or an instance of CloudEvent/EventGridEvent/CustomEvent to be sent.
:type events: SendType
:keyword str content_type: The type of content to be used to send the events.
Has default value "application/json; charset=utf-8" for EventGridEvents,
with "cloudevents-batch+json" for CloudEvents
:rtype: None
:raises: :class:`ValueError`, when events do not follow specified SendType.
"""
if not isinstance(events, list):
events = [events]
if all(isinstance(e, CloudEvent) for e in events) or all(_is_cloud_event(e) for e in events):
try:
events = [e._to_generated(**kwargs) for e in events] # pylint: disable=protected-access
except AttributeError:
pass # means it's a dictionary
kwargs.setdefault("content_type", "application/cloudevents-batch+json; charset=utf-8")
await self._client.publish_cloud_event_events(self._topic_hostname, events, **kwargs)
elif all(isinstance(e, EventGridEvent) for e in events) or all(isinstance(e, dict) for e in events):
kwargs.setdefault("content_type", "application/json; charset=utf-8")
await self._client.publish_events(self._topic_hostname, events, **kwargs)
elif all(isinstance(e, CustomEvent) for e in events):
serialized_events = [dict(e) for e in events]
await self._client.publish_custom_event_events(self._topic_hostname, serialized_events, **kwargs)
else:
raise ValueError("Event schema is not correct.")