Source code for azure.identity._credentials.vscode

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import abc
import os
import sys
from typing import cast, TYPE_CHECKING

from .._exceptions import CredentialUnavailableError
from .._constants import AzureAuthorityHosts, AZURE_VSCODE_CLIENT_ID, EnvironmentVariables
from .._internal import normalize_authority, validate_tenant_id
from .._internal.aad_client import AadClient
from .._internal.get_token_mixin import GetTokenMixin
from .._internal.decorators import log_get_token

if sys.platform.startswith("win"):
    from .._internal.win_vscode_adapter import get_refresh_token, get_user_settings
elif sys.platform.startswith("darwin"):
    from .._internal.macos_vscode_adapter import get_refresh_token, get_user_settings
else:
    from .._internal.linux_vscode_adapter import get_refresh_token, get_user_settings

if TYPE_CHECKING:
    # pylint:disable=unused-import,ungrouped-imports
    from typing import Any, Dict, Optional
    from azure.core.credentials import AccessToken
    from .._internal.aad_client import AadClientBase

try:
    ABC = abc.ABC
except AttributeError:  # Python 2.7, abc exists, but not ABC
    ABC = abc.ABCMeta("ABC", (object,), {"__slots__": ()})  # type: ignore


class _VSCodeCredentialBase(ABC):
    def __init__(self, **kwargs):
        # type: (**Any) -> None
        super(_VSCodeCredentialBase, self).__init__()

        user_settings = get_user_settings()
        self._cloud = user_settings.get("azure.cloud", "AzureCloud")
        self._refresh_token = None
        self._unavailable_reason = ""

        self._client = kwargs.get("_client")
        if not self._client:
            self._initialize(user_settings, **kwargs)
        if not (self._client or self._unavailable_reason):
            self._unavailable_reason = "Initialization failed"

    @abc.abstractmethod
    def _get_client(self, **kwargs):
        # type: (**Any) -> AadClientBase
        pass

    def _get_refresh_token(self):
        # type: () -> str
        if not self._refresh_token:
            self._refresh_token = get_refresh_token(self._cloud)
            if not self._refresh_token:
                raise CredentialUnavailableError(message="Failed to get Azure user details from Visual Studio Code.")
        return self._refresh_token

    def _initialize(self, vscode_user_settings, **kwargs):
        # type: (Dict, **Any) -> None
        """Build a client from kwargs merged with VS Code user settings.

        The first stable version of this credential defaulted to Public Cloud and the "organizations"
        tenant when it failed to read VS Code user settings. That behavior is preserved here.
        """

        # Precedence for authority:
        #  1) VisualStudioCodeCredential(authority=...)
        #  2) $AZURE_AUTHORITY_HOST
        #  3) authority matching VS Code's "azure.cloud" setting
        #  4) default: Public Cloud
        authority = kwargs.pop("authority", None) or os.environ.get(EnvironmentVariables.AZURE_AUTHORITY_HOST)
        if not authority:
            # the application didn't specify an authority, so we figure it out from VS Code settings
            if self._cloud == "AzureCloud":
                authority = AzureAuthorityHosts.AZURE_PUBLIC_CLOUD
            elif self._cloud == "AzureChinaCloud":
                authority = AzureAuthorityHosts.AZURE_CHINA
            elif self._cloud == "AzureGermanCloud":
                authority = AzureAuthorityHosts.AZURE_GERMANY
            elif self._cloud == "AzureUSGovernment":
                authority = AzureAuthorityHosts.AZURE_GOVERNMENT
            else:
                # If the value is anything else ("AzureCustomCloud" is the only other known value),
                # we need the user to provide the authority because VS Code has no setting for it and
                # we can't guess confidently.
                self._unavailable_reason = (
                    'VS Code is configured to use a custom cloud. Set keyword argument "authority"'
                    + ' with the Azure Active Directory endpoint for cloud "{}"'.format(self._cloud)
                )
                return

        # Precedence for tenant ID:
        #  1) VisualStudioCodeCredential(tenant_id=...)
        #  2) "azure.tenant" in VS Code user settings
        #  3) default: organizations
        tenant_id = kwargs.pop("tenant_id", None) or vscode_user_settings.get("azure.tenant", "organizations")
        validate_tenant_id(tenant_id)
        if tenant_id.lower() == "adfs":
            self._unavailable_reason = "VisualStudioCodeCredential authentication unavailable. ADFS is not supported."
            return

        self._client = self._get_client(
            authority=normalize_authority(authority), client_id=AZURE_VSCODE_CLIENT_ID, tenant_id=tenant_id, **kwargs
        )


[docs]class VisualStudioCodeCredential(_VSCodeCredentialBase, GetTokenMixin): """Authenticates as the Azure user signed in to Visual Studio Code. :keyword str authority: authority of an Azure Active Directory endpoint, for example "login.microsoftonline.com". This argument is required for a custom cloud and usually unnecessary otherwise. Defaults to the authority matching the "Azure: Cloud" setting in VS Code's user settings or, when that setting has no value, the authority for Azure Public Cloud. :keyword str tenant_id: ID of the tenant the credential should authenticate in. Defaults to the "Azure: Tenant" setting in VS Code's user settings or, when that setting has no value, the "organizations" tenant, which supports only Azure Active Directory work or school accounts. """ def __enter__(self): if self._client: self._client.__enter__() return self def __exit__(self, *args): if self._client: self._client.__exit__(*args)
[docs] def close(self): # type: () -> None """Close the credential's transport session.""" self.__exit__()
[docs] @log_get_token("VSCodeCredential") def get_token(self, *scopes, **kwargs): # type: (*str, **Any) -> AccessToken """Request an access token for `scopes` as the user currently signed in to Visual Studio Code. This method is called automatically by Azure SDK clients. :param str scopes: desired scopes for the access token. This method requires at least one scope. :rtype: :class:`azure.core.credentials.AccessToken` :raises ~azure.identity.CredentialUnavailableError: the credential cannot retrieve user details from Visual Studio Code """ if self._unavailable_reason: error_message = self._unavailable_reason \ + '\n' \ "Visit https://aka.ms/azsdk/python/identity/vscodecredential/troubleshoot" \ " to troubleshoot this issue." raise CredentialUnavailableError(message=error_message) return super(VisualStudioCodeCredential, self).get_token(*scopes, **kwargs)
def _acquire_token_silently(self, *scopes, **kwargs): # type: (*str, **Any) -> Optional[AccessToken] self._client = cast(AadClient, self._client) return self._client.get_cached_access_token(scopes, **kwargs) def _request_token(self, *scopes, **kwargs): # type: (*str, **Any) -> AccessToken refresh_token = self._get_refresh_token() self._client = cast(AadClient, self._client) return self._client.obtain_token_by_refresh_token(scopes, refresh_token, **kwargs) def _get_client(self, **kwargs): # type: (**Any) -> AadClient return AadClient(**kwargs)