Source code for azure.identity._credentials.on_behalf_of

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import time
from typing import Any, Optional, Callable, Union, Dict

import msal

from azure.core.credentials import AccessTokenInfo
from azure.core.exceptions import ClientAuthenticationError

from .certificate import get_client_credential
from .._internal.decorators import wrap_exceptions
from .._internal.get_token_mixin import GetTokenMixin
from .._internal.interactive import _build_auth_record
from .._internal.msal_credentials import MsalCredential
from .. import AuthenticationRecord


[docs] class OnBehalfOfCredential(MsalCredential, GetTokenMixin): """Authenticates a service principal via the on-behalf-of flow. This flow is typically used by middle-tier services that authorize requests to other services with a delegated user identity. Because this is not an interactive authentication flow, an application using it must have admin consent for any delegated permissions before requesting tokens for them. See `Microsoft Entra ID documentation <https://learn.microsoft.com/entra/identity-platform/v2-oauth2-on-behalf-of-flow>`__ for a more detailed description of the on-behalf-of flow. :param str tenant_id: ID of the service principal's tenant. Also called its "directory" ID. :param str client_id: The service principal's client ID. :keyword str client_secret: Optional. A client secret to authenticate the service principal. One of **client_secret**, **client_certificate**, or **client_assertion_func** must be provided. :keyword bytes client_certificate: Optional. The bytes of a certificate in PEM or PKCS12 format including the private key to authenticate the service principal. One of **client_secret**, **client_certificate**, or **client_assertion_func** must be provided. :keyword client_assertion_func: Optional. Function that returns client assertions that authenticate the application to Microsoft Entra ID. This function is called each time the credential requests a token. It must return a valid assertion for the target resource. :paramtype client_assertion_func: Callable[[], str] :keyword str user_assertion: Required. The access token the credential will use as the user assertion when requesting on-behalf-of tokens. :keyword str authority: Authority of a Microsoft Entra endpoint, for example "login.microsoftonline.com", the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts` defines authorities for other clouds. :keyword password: A certificate password. Used only when **client_certificate** is provided. If this value is a unicode string, it will be encoded as UTF-8. If the certificate requires a different encoding, pass appropriately encoded bytes instead. :paramtype password: str or bytes :keyword bool send_certificate_chain: If True when **client_certificate** is provided, the credential will send the public certificate chain in the x5c header of each token request's JWT. This is required for Subject Name/Issuer (SNI) authentication. Defaults to False. :keyword bool disable_instance_discovery: Determines whether or not instance discovery is performed when attempting to authenticate. Setting this to true will completely disable both instance discovery and authority validation. This functionality is intended for use in scenarios where the metadata endpoint cannot be reached, such as in private clouds or Azure Stack. The process of instance discovery entails retrieving authority metadata from https://login.microsoft.com/ to validate the authority. By setting this to **True**, the validation of the authority is disabled. As a result, it is crucial to ensure that the configured authority host is valid and trustworthy. :keyword List[str] additionally_allowed_tenants: Specifies tenants in addition to the specified "tenant_id" for which the credential may acquire tokens. Add the wildcard value "*" to allow the credential to acquire tokens for any tenant the application can access. .. admonition:: Example: .. literalinclude:: ../samples/credential_creation_code_snippets.py :start-after: [START create_on_behalf_of_credential] :end-before: [END create_on_behalf_of_credential] :language: python :dedent: 4 :caption: Create an OnBehalfOfCredential. """ def __init__( self, tenant_id: str, client_id: str, *, client_certificate: Optional[bytes] = None, client_secret: Optional[str] = None, client_assertion_func: Optional[Callable[[], str]] = None, user_assertion: str, password: Optional[Union[bytes, str]] = None, send_certificate_chain: bool = False, **kwargs: Any ) -> None: self._assertion = user_assertion if not self._assertion: raise TypeError('"user_assertion" must not be empty.') if client_assertion_func: if client_certificate or client_secret: raise ValueError( "It is invalid to specify more than one of the following: " '"client_assertion_func", "client_certificate" or "client_secret".' ) credential: Union[str, Dict[str, Any]] = { "client_assertion": client_assertion_func, } elif client_certificate: if client_secret: raise ValueError('Specifying both "client_certificate" and "client_secret" is not valid.') try: credential = get_client_credential( certificate_path=None, password=password, certificate_data=client_certificate, send_certificate_chain=send_certificate_chain, ) except ValueError as ex: # client_certificate isn't a valid cert. message = '"client_certificate" is not a valid certificate in PEM or PKCS12 format' raise ValueError(message) from ex elif client_secret: credential = client_secret else: raise TypeError('Either "client_certificate", "client_secret", or "client_assertion_func" must be provided') super(OnBehalfOfCredential, self).__init__( client_id=client_id, client_credential=credential, tenant_id=tenant_id, **kwargs ) self._auth_record: Optional[AuthenticationRecord] = None @wrap_exceptions def _acquire_token_silently(self, *scopes: str, **kwargs: Any) -> Optional[AccessTokenInfo]: if self._auth_record: claims = kwargs.get("claims") app = self._get_app(**kwargs) for account in app.get_accounts(username=self._auth_record.username): if account.get("home_account_id") != self._auth_record.home_account_id: continue now = int(time.time()) result = app.acquire_token_silent_with_error(list(scopes), account=account, claims_challenge=claims) if result and "access_token" in result and "expires_in" in result: refresh_on = int(result["refresh_on"]) if "refresh_on" in result else None return AccessTokenInfo( result["access_token"], now + int(result["expires_in"]), token_type=result.get("token_type", "Bearer"), refresh_on=refresh_on, ) return None @wrap_exceptions def _request_token(self, *scopes: str, **kwargs: Any) -> AccessTokenInfo: app: msal.ConfidentialClientApplication = self._get_app(**kwargs) request_time = int(time.time()) result = app.acquire_token_on_behalf_of(self._assertion, list(scopes), claims_challenge=kwargs.get("claims")) if "access_token" not in result or "expires_in" not in result: message = "Authentication failed: {}".format(result.get("error_description") or result.get("error")) response = self._client.get_error_response(result) raise ClientAuthenticationError(message=message, response=response) try: self._auth_record = _build_auth_record(result) except ClientAuthenticationError: pass # non-fatal; we'll use the assertion again next time instead of a refresh token refresh_on = int(result["refresh_on"]) if "refresh_on" in result else None return AccessTokenInfo( result["access_token"], request_time + int(result["expires_in"]), token_type=result.get("token_type", "Bearer"), refresh_on=refresh_on, )