Source code for azure.eventgrid._helpers

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import hashlib
import hmac
import base64
try:
    from urllib.parse import quote
except ImportError:
    from urllib2 import quote # type: ignore

from azure.core.pipeline.policies import AzureKeyCredentialPolicy
from azure.core.credentials import AzureKeyCredential
from ._shared_access_signature_credential import EventGridSharedAccessSignatureCredential
from ._signature_credential_policy import EventGridSharedAccessSignatureCredentialPolicy
from . import _constants as constants

[docs]def generate_shared_access_signature(topic_hostname, shared_access_key, expiration_date_utc, **kwargs): # type: (str, str, datetime.Datetime, Any) -> str """ Helper method to generate shared access signature given hostname, key, and expiration date. :param str topic_hostname: The topic endpoint to send the events to. Similar to <YOUR-TOPIC-NAME>.<YOUR-REGION-NAME>-1.eventgrid.azure.net :param str shared_access_key: The shared access key to be used for generating the token :param datetime.datetime expiration_date_utc: The expiration datetime in UTC for the signature. :param str api_version: The API Version to include in the signature. If not provided, the default API version will be used. :rtype: str """ full_topic_hostname = _get_full_topic_hostname(topic_hostname) full_topic_hostname = "{}?apiVersion={}".format( full_topic_hostname, kwargs.get('api_version', None) or constants.DEFAULT_API_VERSION ) encoded_resource = quote(full_topic_hostname, safe=constants.SAFE_ENCODE) encoded_expiration_utc = quote(str(expiration_date_utc), safe=constants.SAFE_ENCODE) unsigned_sas = "r={}&e={}".format(encoded_resource, encoded_expiration_utc) signature = quote(_generate_hmac(shared_access_key, unsigned_sas), safe=constants.SAFE_ENCODE) signed_sas = "{}&s={}".format(unsigned_sas, signature) return signed_sas
def _get_topic_hostname_only_fqdn(topic_hostname): if topic_hostname.startswith('http://'): raise ValueError("HTTP is not supported. Only HTTPS is supported.") if topic_hostname.startswith('https://'): topic_hostname = topic_hostname.replace("https://", "") if topic_hostname.endswith("/api/events"): topic_hostname = topic_hostname.replace("/api/events", "") return topic_hostname def _get_full_topic_hostname(topic_hostname): if topic_hostname.startswith('http://'): raise ValueError("HTTP is not supported. Only HTTPS is supported.") if not topic_hostname.startswith('https://'): topic_hostname = "https://{}".format(topic_hostname) if not topic_hostname.endswith("/api/events"): topic_hostname = "{}/api/events".format(topic_hostname) return topic_hostname def _generate_hmac(key, message): decoded_key = base64.b64decode(key) bytes_message = message.encode('ascii') hmac_new = hmac.new(decoded_key, bytes_message, hashlib.sha256).digest() return base64.b64encode(hmac_new) def _get_authentication_policy(credential): if credential is None: raise ValueError("Parameter 'self._credential' must not be None.") if isinstance(credential, AzureKeyCredential): authentication_policy = AzureKeyCredentialPolicy(credential=credential, name=constants.EVENTGRID_KEY_HEADER) if isinstance(credential, EventGridSharedAccessSignatureCredential): authentication_policy = EventGridSharedAccessSignatureCredentialPolicy( credential=credential, name=constants.EVENTGRID_TOKEN_HEADER ) return authentication_policy def _is_cloud_event(event): # type: dict -> bool required = ('id', 'source', 'specversion', 'type') try: return all([_ in event for _ in required]) and event['specversion'] == "1.0" except TypeError: return False