Source code for azure.identity._credentials.shared_cache

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import os
import time

from msal.application import PublicClientApplication

from azure.core.credentials import AccessToken
from azure.core.exceptions import ClientAuthenticationError

from .. import CredentialUnavailableError
from .._constants import DEVELOPER_SIGN_ON_CLIENT_ID
from .._internal import AadClient, resolve_tenant, validate_tenant_id
from .._internal.decorators import log_get_token, wrap_exceptions
from .._internal.msal_client import MsalClient
from .._internal.shared_token_cache import NO_TOKEN, SharedTokenCacheBase

try:
    from typing import TYPE_CHECKING
except ImportError:
    TYPE_CHECKING = False

if TYPE_CHECKING:
    # pylint:disable=unused-import,ungrouped-imports
    from typing import Any, Dict, Optional
    from .. import AuthenticationRecord
    from .._internal import AadClientBase


[docs]class SharedTokenCacheCredential(SharedTokenCacheBase): """Authenticates using tokens in the local cache shared between Microsoft applications. :param str username: Username (typically an email address) of the user to authenticate as. This is used when the local cache contains tokens for multiple identities. :keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com', the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts` defines authorities for other clouds. :keyword str tenant_id: an Azure Active Directory tenant ID. Used to select an account when the cache contains tokens for multiple identities. :keyword AuthenticationRecord authentication_record: an authentication record returned by a user credential such as :class:`DeviceCodeCredential` or :class:`InteractiveBrowserCredential` :keyword cache_persistence_options: configuration for persistent token caching. If not provided, the credential will use the persistent cache shared by Microsoft development applications :paramtype cache_persistence_options: ~azure.identity.TokenCachePersistenceOptions :keyword bool allow_multitenant_authentication: when True, enables the credential to acquire tokens from any tenant the user is registered in. When False, which is the default, the credential will acquire tokens only from the user's home tenant or, if a value was given for **authentication_record**, the tenant specified by the :class:`AuthenticationRecord`. """ def __init__(self, username=None, **kwargs): # type: (Optional[str], **Any) -> None self._auth_record = kwargs.pop("authentication_record", None) # type: Optional[AuthenticationRecord] if self._auth_record: # authenticate in the tenant that produced the record unless "tenant_id" specifies another self._tenant_id = kwargs.pop("tenant_id", None) or self._auth_record.tenant_id validate_tenant_id(self._tenant_id) self._allow_multitenant = kwargs.pop("allow_multitenant_authentication", False) self._cache = kwargs.pop("_cache", None) self._client_applications = {} # type: Dict[str, PublicClientApplication] self._msal_client = MsalClient(**kwargs) self._initialized = False else: super(SharedTokenCacheCredential, self).__init__(username=username, **kwargs)
[docs] @log_get_token("SharedTokenCacheCredential") def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument # type (*str, **Any) -> AccessToken """Get an access token for `scopes` from the shared cache. If no access token is cached, attempt to acquire one using a cached refresh token. This method is called automatically by Azure SDK clients. :param str scopes: desired scopes for the access token. This method requires at least one scope. :keyword str claims: additional claims required in the token, such as those returned in a resource provider's claims challenge following an authorization failure :rtype: :class:`azure.core.credentials.AccessToken` :raises ~azure.identity.CredentialUnavailableError: the cache is unavailable or contains insufficient user information :raises ~azure.core.exceptions.ClientAuthenticationError: authentication failed. The error's ``message`` attribute gives a reason. """ if not scopes: raise ValueError("'get_token' requires at least one scope") if not self._initialized: self._initialize() if not self._cache: raise CredentialUnavailableError(message="Shared token cache unavailable") if self._auth_record: return self._acquire_token_silent(*scopes, **kwargs) account = self._get_account(self._username, self._tenant_id) token = self._get_cached_access_token(scopes, account) if token: return token # try each refresh token, returning the first access token acquired for refresh_token in self._get_refresh_tokens(account): token = self._client.obtain_token_by_refresh_token(scopes, refresh_token, **kwargs) return token raise CredentialUnavailableError(message=NO_TOKEN.format(account.get("username")))
def _get_auth_client(self, **kwargs): # type: (**Any) -> AadClientBase return AadClient(client_id=DEVELOPER_SIGN_ON_CLIENT_ID, **kwargs) def _initialize(self): if self._initialized: return if not self._auth_record: super(SharedTokenCacheCredential, self)._initialize() return self._load_cache() self._initialized = True def _get_client_application(self, **kwargs): tenant_id = resolve_tenant(self._tenant_id, self._allow_multitenant, **kwargs) if tenant_id not in self._client_applications: # CP1 = can handle claims challenges (CAE) capabilities = None if "AZURE_IDENTITY_DISABLE_CP1" in os.environ else ["CP1"] self._client_applications[tenant_id] = PublicClientApplication( client_id=self._auth_record.client_id, authority="https://{}/{}".format(self._auth_record.authority, tenant_id), token_cache=self._cache, http_client=self._msal_client, client_capabilities=capabilities ) return self._client_applications[tenant_id] @wrap_exceptions def _acquire_token_silent(self, *scopes, **kwargs): # type: (*str, **Any) -> AccessToken """Silently acquire a token from MSAL. Requires an AuthenticationRecord.""" # this won't be None when this method is called by get_token but we check anyway to satisfy mypy if self._auth_record is None: raise CredentialUnavailableError("Initialization failed") result = None client_application = self._get_client_application(**kwargs) accounts_for_user = client_application.get_accounts(username=self._auth_record.username) if not accounts_for_user: raise CredentialUnavailableError("The cache contains no account matching the given AuthenticationRecord.") for account in accounts_for_user: if account.get("home_account_id") != self._auth_record.home_account_id: continue now = int(time.time()) result = client_application.acquire_token_silent_with_error( list(scopes), account=account, claims_challenge=kwargs.get("claims") ) if result and "access_token" in result and "expires_in" in result: return AccessToken(result["access_token"], now + int(result["expires_in"])) # if we get this far, the cache contained a matching account but MSAL failed to authenticate it silently if result: # cache contains a matching refresh token but STS returned an error response when MSAL tried to use it message = "Token acquisition failed" details = result.get("error_description") or result.get("error") if details: message += ": {}".format(details) raise ClientAuthenticationError(message=message) # cache doesn't contain a matching refresh (or access) token raise CredentialUnavailableError(message=NO_TOKEN.format(self._auth_record.username))