Source code for azure.identity._credentials.chained

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import logging

from azure.core.exceptions import ClientAuthenticationError

from .. import CredentialUnavailableError
from .._internal import within_credential_chain

try:
    from typing import TYPE_CHECKING
except ImportError:
    TYPE_CHECKING = False

if TYPE_CHECKING:
    # pylint:disable=unused-import,ungrouped-imports
    from typing import Any, Optional
    from azure.core.credentials import AccessToken, TokenCredential

_LOGGER = logging.getLogger(__name__)


def _get_error_message(history):
    attempts = []
    for credential, error in history:
        if error:
            attempts.append("{}: {}".format(credential.__class__.__name__, error))
        else:
            attempts.append(credential.__class__.__name__)
    return """
Attempted credentials:\n\t{}""".format(
        "\n\t".join(attempts)
    )


[docs]class ChainedTokenCredential(object): """A sequence of credentials that is itself a credential. Its :func:`get_token` method calls ``get_token`` on each credential in the sequence, in order, returning the first valid token received. :param credentials: credential instances to form the chain :type credentials: :class:`azure.core.credentials.TokenCredential` """ def __init__(self, *credentials): # type: (*TokenCredential) -> None if not credentials: raise ValueError("at least one credential is required") self._successful_credential = None # type: Optional[TokenCredential] self.credentials = credentials def __enter__(self): for credential in self.credentials: credential.__enter__() return self def __exit__(self, *args): for credential in self.credentials: credential.__exit__(*args)
[docs] def close(self): # type: () -> None """Close the transport session of each credential in the chain.""" self.__exit__()
[docs] def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument # type: (*str, **Any) -> AccessToken """Request a token from each chained credential, in order, returning the first token received. This method is called automatically by Azure SDK clients. :param str scopes: desired scopes for the access token. This method requires at least one scope. :raises ~azure.core.exceptions.ClientAuthenticationError: no credential in the chain provided a token """ within_credential_chain.set(True) history = [] for credential in self.credentials: try: token = credential.get_token(*scopes, **kwargs) _LOGGER.info("%s acquired a token from %s", self.__class__.__name__, credential.__class__.__name__) self._successful_credential = credential return token except CredentialUnavailableError as ex: # credential didn't attempt authentication because it lacks required data or state -> continue history.append((credential, ex.message)) except Exception as ex: # pylint: disable=broad-except # credential failed to authenticate, or something unexpectedly raised -> break history.append((credential, str(ex))) _LOGGER.debug( '%s.get_token failed: %s raised unexpected error "%s"', self.__class__.__name__, credential.__class__.__name__, ex, exc_info=True, ) break within_credential_chain.set(False) attempts = _get_error_message(history) message = self.__class__.__name__ + " failed to retrieve a token from the included credentials." + attempts \ + "\nTo mitigate this issue, please refer to the troubleshooting guidelines here at " \ "https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot." _LOGGER.warning(message) raise ClientAuthenticationError(message=message)