# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import platform
import socket
import subprocess
import webbrowser
from six.moves.urllib_parse import urlparse
from azure.core.exceptions import ClientAuthenticationError
from .. import CredentialUnavailableError
from .._constants import DEVELOPER_SIGN_ON_CLIENT_ID
from .._internal import AuthCodeRedirectServer, InteractiveCredential, wrap_exceptions
try:
from typing import TYPE_CHECKING
except ImportError:
TYPE_CHECKING = False
if TYPE_CHECKING:
# pylint:disable=unused-import
from typing import Any
[docs]class InteractiveBrowserCredential(InteractiveCredential):
"""Opens a browser to interactively authenticate a user.
:func:`~get_token` opens a browser to a login URL provided by Azure Active Directory and authenticates a user
there with the authorization code flow, using PKCE (Proof Key for Code Exchange) internally to protect the code.
:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts`
defines authorities for other clouds.
:keyword str tenant_id: an Azure Active Directory tenant ID. Defaults to the 'organizations' tenant, which can
authenticate work or school accounts.
:keyword str client_id: Client ID of the Azure Active Directory application users will sign in to. If
unspecified, users will authenticate to an Azure development application.
:keyword str redirect_uri: a redirect URI for the application identified by `client_id` as configured in Azure
Active Directory, for example "http://localhost:8400". This is only required when passing a value for
`client_id`, and must match a redirect URI in the application's registration. The credential must be able to
bind a socket to this URI.
:keyword AuthenticationRecord authentication_record: :class:`AuthenticationRecord` returned by :func:`authenticate`
:keyword bool disable_automatic_authentication: if True, :func:`get_token` will raise
:class:`AuthenticationRequiredError` when user interaction is required to acquire a token. Defaults to False.
:keyword cache_persistence_options: configuration for persistent token caching. If unspecified, the credential
will cache tokens in memory.
:paramtype cache_persistence_options: ~azure.identity.TokenCachePersistenceOptions
:keyword int timeout: seconds to wait for the user to complete authentication. Defaults to 300 (5 minutes).
:raises ValueError: invalid `redirect_uri`
"""
def __init__(self, **kwargs):
# type: (**Any) -> None
redirect_uri = kwargs.pop("redirect_uri", None)
if redirect_uri:
self._parsed_url = urlparse(redirect_uri)
if not (self._parsed_url.hostname and self._parsed_url.port):
raise ValueError('"redirect_uri" must be a URL with port number, for example "http://localhost:8400"')
else:
self._parsed_url = None
self._timeout = kwargs.pop("timeout", 300)
self._server_class = kwargs.pop("_server_class", AuthCodeRedirectServer)
client_id = kwargs.pop("client_id", DEVELOPER_SIGN_ON_CLIENT_ID)
super(InteractiveBrowserCredential, self).__init__(client_id=client_id, **kwargs)
@wrap_exceptions
def _request_token(self, *scopes, **kwargs):
# type: (*str, **Any) -> dict
# start an HTTP server to receive the redirect
server = None
if self._parsed_url:
try:
redirect_uri = "http://{}:{}".format(self._parsed_url.hostname, self._parsed_url.port)
server = self._server_class(self._parsed_url.hostname, self._parsed_url.port, timeout=self._timeout)
except socket.error:
raise CredentialUnavailableError(message="Couldn't start an HTTP server on " + redirect_uri)
else:
for port in range(8400, 9000):
try:
server = self._server_class("localhost", port, timeout=self._timeout)
redirect_uri = "http://localhost:{}".format(port)
break
except socket.error:
continue # keep looking for an open port
if not server:
raise CredentialUnavailableError(message="Couldn't start an HTTP server on localhost")
# get the url the user must visit to authenticate
scopes = list(scopes) # type: ignore
claims = kwargs.get("claims")
app = self._get_app()
flow = app.initiate_auth_code_flow(
scopes, redirect_uri=redirect_uri, prompt="select_account", claims_challenge=claims
)
if "auth_uri" not in flow:
raise CredentialUnavailableError("Failed to begin authentication flow")
if not _open_browser(flow["auth_uri"]):
raise CredentialUnavailableError(message="Failed to open a browser")
# block until the server times out or receives the post-authentication redirect
response = server.wait_for_redirect()
if not response:
raise ClientAuthenticationError(
message="Timed out after waiting {} seconds for the user to authenticate".format(self._timeout)
)
# redeem the authorization code for a token
return app.acquire_token_by_auth_code_flow(flow, response, scopes=scopes, claims_challenge=claims)
def _open_browser(url):
opened = webbrowser.open(url)
if not opened:
uname = platform.uname()
system = uname[0].lower()
release = uname[2].lower()
if "microsoft" in release and system == "linux":
kwargs = {}
if platform.python_version() >= "3.3":
kwargs["timeout"] = 5
try:
exit_code = subprocess.call(
["powershell.exe", "-NoProfile", "-Command", 'Start-Process "{}"'.format(url)], **kwargs
)
opened = exit_code == 0
except Exception: # pylint:disable=broad-except
# powershell.exe isn't available, or the subprocess timed out
pass
return opened