Source code for azure.identity._credentials.browser

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import platform
import socket
import subprocess
import webbrowser

from six.moves.urllib_parse import urlparse

from azure.core.exceptions import ClientAuthenticationError

from .. import CredentialUnavailableError
from .._constants import DEVELOPER_SIGN_ON_CLIENT_ID
from .._internal import AuthCodeRedirectServer, InteractiveCredential, wrap_exceptions

    from typing import TYPE_CHECKING
except ImportError:

    # pylint:disable=unused-import
    from typing import Any

[docs]class InteractiveBrowserCredential(InteractiveCredential): """Opens a browser to interactively authenticate a user. :func:`~get_token` opens a browser to a login URL provided by Azure Active Directory and authenticates a user there with the authorization code flow, using PKCE (Proof Key for Code Exchange) internally to protect the code. :keyword str authority: Authority of an Azure Active Directory endpoint, for example '', the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts` defines authorities for other clouds. :keyword str tenant_id: an Azure Active Directory tenant ID. Defaults to the 'organizations' tenant, which can authenticate work or school accounts. :keyword str client_id: Client ID of the Azure Active Directory application users will sign in to. If unspecified, users will authenticate to an Azure development application. :keyword str redirect_uri: a redirect URI for the application identified by `client_id` as configured in Azure Active Directory, for example "http://localhost:8400". This is only required when passing a value for `client_id`, and must match a redirect URI in the application's registration. The credential must be able to bind a socket to this URI. :keyword AuthenticationRecord authentication_record: :class:`AuthenticationRecord` returned by :func:`authenticate` :keyword bool disable_automatic_authentication: if True, :func:`get_token` will raise :class:`AuthenticationRequiredError` when user interaction is required to acquire a token. Defaults to False. :keyword cache_persistence_options: configuration for persistent token caching. If unspecified, the credential will cache tokens in memory. :paramtype cache_persistence_options: ~azure.identity.TokenCachePersistenceOptions :keyword int timeout: seconds to wait for the user to complete authentication. Defaults to 300 (5 minutes). :raises ValueError: invalid `redirect_uri` """ def __init__(self, **kwargs): # type: (**Any) -> None redirect_uri = kwargs.pop("redirect_uri", None) if redirect_uri: self._parsed_url = urlparse(redirect_uri) if not (self._parsed_url.hostname and self._parsed_url.port): raise ValueError('"redirect_uri" must be a URL with port number, for example "http://localhost:8400"') else: self._parsed_url = None self._timeout = kwargs.pop("timeout", 300) self._server_class = kwargs.pop("_server_class", AuthCodeRedirectServer) client_id = kwargs.pop("client_id", DEVELOPER_SIGN_ON_CLIENT_ID) super(InteractiveBrowserCredential, self).__init__(client_id=client_id, **kwargs) @wrap_exceptions def _request_token(self, *scopes, **kwargs): # type: (*str, **Any) -> dict # start an HTTP server to receive the redirect server = None if self._parsed_url: try: redirect_uri = "http://{}:{}".format(self._parsed_url.hostname, self._parsed_url.port) server = self._server_class(self._parsed_url.hostname, self._parsed_url.port, timeout=self._timeout) except socket.error: raise CredentialUnavailableError(message="Couldn't start an HTTP server on " + redirect_uri) else: for port in range(8400, 9000): try: server = self._server_class("localhost", port, timeout=self._timeout) redirect_uri = "http://localhost:{}".format(port) break except socket.error: continue # keep looking for an open port if not server: raise CredentialUnavailableError(message="Couldn't start an HTTP server on localhost") # get the url the user must visit to authenticate scopes = list(scopes) # type: ignore claims = kwargs.get("claims") app = self._get_app() flow = app.initiate_auth_code_flow( scopes, redirect_uri=redirect_uri, prompt="select_account", claims_challenge=claims ) if "auth_uri" not in flow: raise CredentialUnavailableError("Failed to begin authentication flow") if not _open_browser(flow["auth_uri"]): raise CredentialUnavailableError(message="Failed to open a browser") # block until the server times out or receives the post-authentication redirect response = server.wait_for_redirect() if not response: raise ClientAuthenticationError( message="Timed out after waiting {} seconds for the user to authenticate".format(self._timeout) ) # redeem the authorization code for a token return app.acquire_token_by_auth_code_flow(flow, response, scopes=scopes, claims_challenge=claims)
def _open_browser(url): opened = if not opened: uname = platform.uname() system = uname[0].lower() release = uname[2].lower() if "microsoft" in release and system == "linux": kwargs = {} if platform.python_version() >= "3.3": kwargs["timeout"] = 5 try: exit_code = ["powershell.exe", "-NoProfile", "-Command", 'Start-Process "{}"'.format(url)], **kwargs ) opened = exit_code == 0 except Exception: # pylint:disable=broad-except # powershell.exe isn't available, or the subprocess timed out pass return opened