# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from typing import TYPE_CHECKING
from .silent import SilentAuthenticationCredential
from .. import CredentialUnavailableError
from .._constants import DEVELOPER_SIGN_ON_CLIENT_ID
from .._internal import AadClient
from .._internal.decorators import log_get_token
from .._internal.shared_token_cache import NO_TOKEN, SharedTokenCacheBase
if TYPE_CHECKING:
# pylint:disable=unused-import,ungrouped-imports
from typing import Any, Optional
from azure.core.credentials import TokenCredential
from .._internal import AadClientBase
[docs]class SharedTokenCacheCredential(object):
"""Authenticates using tokens in the local cache shared between Microsoft applications.
:param str username: Username (typically an email address) of the user to authenticate as. This is used when the
local cache contains tokens for multiple identities.
:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts`
defines authorities for other clouds.
:keyword str tenant_id: an Azure Active Directory tenant ID. Used to select an account when the cache contains
tokens for multiple identities.
:keyword AuthenticationRecord authentication_record: an authentication record returned by a user credential such as
:class:`DeviceCodeCredential` or :class:`InteractiveBrowserCredential`
:keyword cache_persistence_options: configuration for persistent token caching. If not provided, the credential
will use the persistent cache shared by Microsoft development applications
:paramtype cache_persistence_options: ~azure.identity.TokenCachePersistenceOptions
:keyword bool allow_multitenant_authentication: when True, enables the credential to acquire tokens from any tenant
the user is registered in. When False, which is the default, the credential will acquire tokens only from the
user's home tenant or, if a value was given for **authentication_record**, the tenant specified by the
:class:`AuthenticationRecord`.
"""
def __init__(self, username=None, **kwargs):
# type: (Optional[str], **Any) -> None
if "authentication_record" in kwargs:
self._credential = SilentAuthenticationCredential(**kwargs) # type: TokenCredential
else:
self._credential = _SharedTokenCacheCredential(username=username, **kwargs)
def __enter__(self):
self._credential.__enter__()
return self
def __exit__(self, *args):
self._credential.__exit__(*args)
[docs] def close(self):
# type: () -> None
"""Close the credential's transport session."""
self.__exit__()
[docs] @log_get_token("SharedTokenCacheCredential")
def get_token(self, *scopes, **kwargs):
# type (*str, **Any) -> AccessToken
"""Get an access token for `scopes` from the shared cache.
If no access token is cached, attempt to acquire one using a cached refresh token.
This method is called automatically by Azure SDK clients.
:param str scopes: desired scopes for the access token. This method requires at least one scope.
:keyword str claims: additional claims required in the token, such as those returned in a resource provider's
claims challenge following an authorization failure
:rtype: :class:`azure.core.credentials.AccessToken`
:raises ~azure.identity.CredentialUnavailableError: the cache is unavailable or contains insufficient user
information
:raises ~azure.core.exceptions.ClientAuthenticationError: authentication failed. The error's ``message``
attribute gives a reason.
"""
return self._credential.get_token(*scopes, **kwargs)
[docs] @staticmethod
def supported():
# type: () -> bool
"""Whether the shared token cache is supported on the current platform.
:rtype: bool
"""
return SharedTokenCacheBase.supported()
class _SharedTokenCacheCredential(SharedTokenCacheBase):
"""The original SharedTokenCacheCredential, which doesn't use msal.ClientApplication"""
def __enter__(self):
if self._client:
self._client.__enter__()
return self
def __exit__(self, *args):
if self._client:
self._client.__exit__(*args)
def get_token(self, *scopes, **kwargs):
# type (*str, **Any) -> AccessToken
if not scopes:
raise ValueError("'get_token' requires at least one scope")
if not self._initialized:
self._initialize()
if not self._cache:
raise CredentialUnavailableError(message="Shared token cache unavailable")
account = self._get_account(self._username, self._tenant_id)
token = self._get_cached_access_token(scopes, account)
if token:
return token
# try each refresh token, returning the first access token acquired
for refresh_token in self._get_refresh_tokens(account):
token = self._client.obtain_token_by_refresh_token(scopes, refresh_token, **kwargs)
return token
raise CredentialUnavailableError(message=NO_TOKEN.format(account.get("username")))
def _get_auth_client(self, **kwargs):
# type: (**Any) -> AadClientBase
return AadClient(client_id=DEVELOPER_SIGN_ON_CLIENT_ID, **kwargs)