Source code for azure.identity.aio._credentials.environment

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import os
from typing import TYPE_CHECKING

from ... import CredentialUnavailableError
from ..._constants import EnvironmentVariables
from ..._credentials.environment import get_credential_unavailable_message
from .client_credential import CertificateCredential, ClientSecretCredential
from .base import AsyncCredentialBase

if TYPE_CHECKING:
    from typing import Any, Optional, Union
    from azure.core.credentials import AccessToken


[docs]class EnvironmentCredential(AsyncCredentialBase): """A credential configured by environment variables. This credential is capable of authenticating as a service principal using a client secret or a certificate, or as a user with a username and password. Configuration is attempted in this order, using these environment variables: Service principal with secret: - **AZURE_TENANT_ID**: ID of the service principal's tenant. Also called its 'directory' ID. - **AZURE_CLIENT_ID**: the service principal's client ID - **AZURE_CLIENT_SECRET**: one of the service principal's client secrets Service principal with certificate: - **AZURE_TENANT_ID**: ID of the service principal's tenant. Also called its 'directory' ID. - **AZURE_CLIENT_ID**: the service principal's client ID - **AZURE_CLIENT_CERTIFICATE_PATH**: path to a PEM-encoded certificate file including the private key The certificate must not be password-protected. """ def __init__(self, **kwargs: "Any") -> None: self._credential = None # type: Optional[Union[CertificateCredential, ClientSecretCredential]] self._unavailable_message = "" if all(os.environ.get(v) is not None for v in EnvironmentVariables.CLIENT_SECRET_VARS): self._credential = ClientSecretCredential( client_id=os.environ[EnvironmentVariables.AZURE_CLIENT_ID], client_secret=os.environ[EnvironmentVariables.AZURE_CLIENT_SECRET], tenant_id=os.environ[EnvironmentVariables.AZURE_TENANT_ID], **kwargs ) elif all(os.environ.get(v) is not None for v in EnvironmentVariables.CERT_VARS): self._credential = CertificateCredential( client_id=os.environ[EnvironmentVariables.AZURE_CLIENT_ID], tenant_id=os.environ[EnvironmentVariables.AZURE_TENANT_ID], certificate_path=os.environ[EnvironmentVariables.AZURE_CLIENT_CERTIFICATE_PATH], **kwargs ) if not self._credential: self._unavailable_message = get_credential_unavailable_message() async def __aenter__(self): if self._credential: await self._credential.__aenter__() return self
[docs] async def close(self): """Close the credential's transport session.""" if self._credential: await self._credential.__aexit__()
[docs] async def get_token(self, *scopes: str, **kwargs: "Any") -> "AccessToken": """Asynchronously request an access token for `scopes`. .. note:: This method is called by Azure SDK clients. It isn't intended for use in application code. :param str scopes: desired scopes for the token :rtype: :class:`azure.core.credentials.AccessToken` :raises ~azure.identity.CredentialUnavailableError: environment variable configuration is incomplete """ if not self._credential: raise CredentialUnavailableError(message=self._unavailable_message) return await self._credential.get_token(*scopes, **kwargs)