azure.eventgrid.aio package

class azure.eventgrid.aio.EventGridPublisherClient(endpoint: str, credential: AsyncTokenCredential | AzureKeyCredential | AzureSasCredential, *, api_version: str | None = None, **kwargs: Any)[source]

Asynchronous EventGridPublisherClient publishes events to an EventGrid topic or domain. It can be used to publish either an EventGridEvent, a CloudEvent or a Custom Schema.

Parameters:
Keyword Arguments:

api_version (str) – Api Version. Will default to the most recent Api Version. Note that overriding this default value may result in unsupported behavior.

Return type:

None

Example:

Creating the EventGridPublisherClient with an endpoint and AzureKeyCredential.
import os
from azure.eventgrid.aio import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential

topic_key = os.environ["EVENTGRID_TOPIC_KEY"]
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]

credential_key = AzureKeyCredential(topic_key)
client = EventGridPublisherClient(endpoint, credential_key)
Creating the EventGridPublisherClient with an endpoint and AzureSasCredential.
import os
from azure.eventgrid.aio import EventGridPublisherClient
from azure.core.credentials import AzureSasCredential

signature = os.environ["EVENTGRID_SAS"]
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]

credential_sas = AzureSasCredential(signature)
client = EventGridPublisherClient(endpoint, credential_sas)
async close() None[source]

Close the EventGridPublisherClient session.

async send(events: CloudEvent | EventGridEvent | Dict | CNCFCloudEvent | List[CloudEvent] | List[EventGridEvent] | List[Dict] | List[CNCFCloudEvent], *, channel_name: str | None = None, **kwargs: Any) None[source]

Sends events to a topic or a domain specified during the client initialization.

A single instance or a list of dictionaries, CloudEvents or EventGridEvents are accepted.

Example:

Publishing an EventGridEvent.
import os
import asyncio
from azure.eventgrid import EventGridEvent
from azure.eventgrid.aio import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential

topic_key = os.environ["EVENTGRID_TOPIC_KEY"]
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]

async def publish():
    credential = AzureKeyCredential(topic_key)
    client = EventGridPublisherClient(endpoint, credential)

    await client.send([
        EventGridEvent(
            event_type="Contoso.Items.ItemReceived",
            data={
                "itemSku": "Contoso Item SKU #1"
            },
            subject="Door1",
            data_version="2.0"
        )
    ])
Publishing a CloudEvent.
import os
import asyncio
from azure.core.messaging import CloudEvent
from azure.eventgrid.aio import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential

topic_key = os.environ["EVENTGRID_CLOUD_EVENT_TOPIC_KEY"]
endpoint = os.environ["EVENTGRID_CLOUD_EVENT_TOPIC_ENDPOINT"]

async def publish():
    credential = AzureKeyCredential(topic_key)
    client = EventGridPublisherClient(endpoint, credential)

    await client.send([
        CloudEvent(
            type="Contoso.Items.ItemReceived",
            source="/contoso/items",
            data={
                "itemSku": "Contoso Item SKU #1"
            },
            subject="Door1"
        )
    ])

Dict representation of respective serialized models is accepted as CloudEvent(s) or EventGridEvent(s) apart from the strongly typed objects.

Example:

Publishing a list of EventGridEvents using a dict-like representation.
event0 = {	
    "eventType": "Contoso.Items.ItemReceived",	
    "data": {	
        "itemSku": "Contoso Item SKU #1"	
    },	
    "subject": "Door1",	
    "dataVersion": "2.0",	
    "id": "randomuuid11",	
    "eventTime": datetime.utcnow()
}	
event1 = {	
    "eventType": "Contoso.Items.ItemReceived",	
    "data": {	
        "itemSku": "Contoso Item SKU #2"	
    },	
    "subject": "Door1",	
    "dataVersion": "2.0",	
    "id": "randomuuid12",	
    "eventTime": datetime.utcnow()
}

async with client:	
    await client.send([event0, event1])
Publishing a CloudEvent using a dict-like representation.
async with client:
    await client.send([
        {
            "type": "Contoso.Items.ItemReceived",
            "source": "/contoso/items",	
            "data": {	
                "itemSku": "Contoso Item SKU #1"	
            },	
            "subject": "Door1",	
            "specversion": "1.0",	
            "id": "randomclouduuid11"
        }
    ])

When publishing a Custom Schema Event(s), dict-like representation is accepted. Either a single dictionary or a list of dictionaries can be passed.

Example:

Publishing a Custom Schema event.
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

custom_schema_event = {
    "customSubject": "sample",
    "customEventType": "sample.event",
    "customDataVersion": "2.0",
    "customId": uuid.uuid4(),
    "customEventTime": dt.datetime.now(UTC()).isoformat(),
    "customData": "sample data"
}
async with client:
    # publish list of events
    await client.send(custom_schema_event)

WARNING: When sending a list of multiple events at one time, iterating over and sending each event will not result in optimal performance. For best performance, it is highly recommended to send a list of events.

Parameters:

events (CloudEvent or EventGridEvent or dict or List[CloudEvent] or List[EventGridEvent] or List[dict]) – A single instance or a list of dictionaries/CloudEvent/EventGridEvent to be sent.

Keyword Arguments:
  • content_type (str) – The type of content to be used to send the events. Has default value “application/json; charset=utf-8” for EventGridEvents, with “cloudevents-batch+json” for CloudEvents

  • channel_name (str or None namespaces with partner topic. For more details, visit https://docs.microsoft.com/azure/event-grid/partner-events-overview) – Optional. Used to specify the name of event channel when publishing to partner

Return type:

None