Source code for azure.identity._credentials.certificate

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from binascii import hexlify
from typing import TYPE_CHECKING

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
import six

from .._internal.client_credential_base import ClientCredentialBase

if TYPE_CHECKING:
    from typing import Any


[docs]class CertificateCredential(ClientCredentialBase): """Authenticates as a service principal using a certificate. :param str tenant_id: ID of the service principal's tenant. Also called its 'directory' ID. :param str client_id: the service principal's client ID :param str certificate_path: path to a PEM-encoded certificate file including the private key. :keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com', the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts` defines authorities for other clouds. :keyword password: The certificate's password. If a unicode string, it will be encoded as UTF-8. If the certificate requires a different encoding, pass appropriately encoded bytes instead. :paramtype password: str or bytes :keyword bool send_certificate: if True, the credential will send public certificate material with token requests. This is required to use Subject Name/Issuer (SNI) authentication. Defaults to False. :keyword bool enable_persistent_cache: if True, the credential will store tokens in a persistent cache. Defaults to False. :keyword bool allow_unencrypted_cache: if True, the credential will fall back to a plaintext cache when encryption is unavailable. Default to False. Has no effect when `enable_persistent_cache` is False. """ def __init__(self, tenant_id, client_id, certificate_path, **kwargs): # type: (str, str, str, **Any) -> None if not certificate_path: raise ValueError( "'certificate_path' must be the path to a PEM file containing an x509 certificate and its private key" ) password = kwargs.pop("password", None) if isinstance(password, six.text_type): password = password.encode(encoding="utf-8") with open(certificate_path, "rb") as f: pem_bytes = f.read() cert = x509.load_pem_x509_certificate(pem_bytes, default_backend()) fingerprint = cert.fingerprint(hashes.SHA1()) # nosec # TODO: msal doesn't formally support passwords (but soon will); the below depends on an implementation detail private_key = serialization.load_pem_private_key(pem_bytes, password=password, backend=default_backend()) client_credential = {"private_key": private_key, "thumbprint": hexlify(fingerprint).decode("utf-8")} if kwargs.pop("send_certificate", False): try: # the JWT needs the whole chain but load_pem_x509_certificate deserializes only the signing cert chain = extract_cert_chain(pem_bytes) client_credential["public_certificate"] = six.ensure_str(chain) except ValueError as ex: # we shouldn't land here, because load_pem_private_key should have raised when given a malformed file message = 'Found no PEM encoded certificate in "{}"'.format(certificate_path) six.raise_from(ValueError(message), ex) super(CertificateCredential, self).__init__( client_id=client_id, client_credential=client_credential, tenant_id=tenant_id, **kwargs )
def extract_cert_chain(pem_bytes): # type: (bytes) -> bytes """Extract a certificate chain from a PEM file's bytes, removing line breaks.""" # if index raises ValueError, there's no PEM-encoded cert start = pem_bytes.index(b"-----BEGIN CERTIFICATE-----") footer = b"-----END CERTIFICATE-----" end = pem_bytes.rindex(footer) chain = pem_bytes[start:end + len(footer) + 1] return b"".join(chain.splitlines())