# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import socket
import time
import uuid
import webbrowser
from azure.core.credentials import AccessToken
from azure.core.exceptions import ClientAuthenticationError
from .. import CredentialUnavailableError
from .._constants import AZURE_CLI_CLIENT_ID
from .._internal import AuthCodeRedirectServer, PublicClientCredential, wrap_exceptions
try:
from typing import TYPE_CHECKING
except ImportError:
TYPE_CHECKING = False
if TYPE_CHECKING:
# pylint:disable=unused-import
from typing import Any, List, Mapping
[docs]class InteractiveBrowserCredential(PublicClientCredential):
"""Opens a browser to interactively authenticate a user.
:func:`~get_token` opens a browser to a login URL provided by Azure Active Directory and authenticates a user
there with the authorization code flow. Azure Active Directory documentation describes this flow in more detail:
https://docs.microsoft.com/en-us/azure/active-directory/develop/v1-protocols-oauth-code
:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.KnownAuthorities`
defines authorities for other clouds.
:keyword str tenant_id: an Azure Active Directory tenant ID. Defaults to the 'organizations' tenant, which can
authenticate work or school accounts.
:keyword str client_id: Client ID of the Azure Active Directory application users will sign in to. If
unspecified, the Azure CLI's ID will be used.
:keyword int timeout: seconds to wait for the user to complete authentication. Defaults to 300 (5 minutes).
"""
def __init__(self, **kwargs):
# type: (**Any) -> None
self._timeout = kwargs.pop("timeout", 300)
self._server_class = kwargs.pop("server_class", AuthCodeRedirectServer) # facilitate mocking
client_id = kwargs.pop("client_id", AZURE_CLI_CLIENT_ID)
super(InteractiveBrowserCredential, self).__init__(client_id=client_id, **kwargs)
[docs] @wrap_exceptions
def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
# type: (*str, **Any) -> AccessToken
"""Request an access token for `scopes`.
This will open a browser to a login page and listen on localhost for a request indicating authentication has
completed.
.. note:: This method is called by Azure SDK clients. It isn't intended for use in application code.
:param str scopes: desired scopes for the token
:rtype: :class:`azure.core.credentials.AccessToken`
:raises ~azure.identity.CredentialUnavailableError: the credential is unable to start an HTTP server on
localhost, or is unable to open a browser
:raises ~azure.core.exceptions.ClientAuthenticationError: authentication failed. The error's ``message``
attribute gives a reason. Any error response from Azure Active Directory is available as the error's
``response`` attribute.
"""
return self._get_token_from_cache(scopes, **kwargs) or self._get_token_by_auth_code(scopes, **kwargs)
def _get_token_from_cache(self, scopes, **kwargs):
"""if the user has already signed in, we can redeem a refresh token for a new access token"""
app = self._get_app()
accounts = app.get_accounts()
if accounts: # => user has already authenticated
# MSAL asserts scopes is a list
scopes = list(scopes) # type: ignore
now = int(time.time())
token = app.acquire_token_silent(scopes, account=accounts[0], **kwargs)
if token and "access_token" in token and "expires_in" in token:
return AccessToken(token["access_token"], now + int(token["expires_in"]))
return None
def _get_token_by_auth_code(self, scopes, **kwargs):
# start an HTTP server on localhost to receive the redirect
for port in range(8400, 9000):
try:
server = self._server_class(port, timeout=self._timeout)
redirect_uri = "http://localhost:{}".format(port)
break
except socket.error:
continue # keep looking for an open port
if not redirect_uri:
raise CredentialUnavailableError(message="Couldn't start an HTTP server on localhost")
# get the url the user must visit to authenticate
scopes = list(scopes) # type: ignore
request_state = str(uuid.uuid4())
app = self._get_app()
auth_url = app.get_authorization_request_url(
scopes, redirect_uri=redirect_uri, state=request_state, prompt="select_account", **kwargs
)
# open browser to that url
if not webbrowser.open(auth_url):
raise CredentialUnavailableError(message="Failed to open a browser")
# block until the server times out or receives the post-authentication redirect
response = server.wait_for_redirect()
if not response:
raise ClientAuthenticationError(
message="Timed out after waiting {} seconds for the user to authenticate".format(self._timeout)
)
# redeem the authorization code for a token
code = self._parse_response(request_state, response)
now = int(time.time())
result = app.acquire_token_by_authorization_code(code, scopes=scopes, redirect_uri=redirect_uri, **kwargs)
if "access_token" not in result:
raise ClientAuthenticationError(message="Authentication failed: {}".format(result.get("error_description")))
return AccessToken(result["access_token"], now + int(result["expires_in"]))
@staticmethod
def _parse_response(request_state, response):
# type: (str, Mapping[str, Any]) -> List[str]
"""Validates ``response`` and returns the authorization code it contains, if authentication succeeded.
Raises :class:`azure.core.exceptions.ClientAuthenticationError`, if authentication failed or ``response`` is
malformed.
"""
if "error" in response:
message = "Authentication failed: {}".format(response.get("error_description") or response["error"])
raise ClientAuthenticationError(message=message)
if "code" not in response:
# a response with no error or code is malformed; we don't know what to do with it
message = "Authentication server didn't send an authorization code"
raise ClientAuthenticationError(message=message)
# response must include the state sent in the auth request
if "state" not in response:
raise ClientAuthenticationError(message="Authentication response doesn't include OAuth state")
if response["state"][0] != request_state:
raise ClientAuthenticationError(message="Authentication response's OAuth state doesn't match the request's")
return response["code"]