Source code for azure.identity._credentials.azure_cli

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from datetime import datetime
import json
import os
import platform
import re
import subprocess
import sys
import time
from typing import TYPE_CHECKING

import six

from azure.core.credentials import AccessToken
from azure.core.exceptions import ClientAuthenticationError

from .. import CredentialUnavailableError
from .._internal import _scopes_to_resource, resolve_tenant
from .._internal.decorators import log_get_token

if TYPE_CHECKING:
    # pylint:disable=ungrouped-imports
    from typing import Any


CLI_NOT_FOUND = "Azure CLI not found on path"
COMMAND_LINE = "az account get-access-token --output json --resource {}"
NOT_LOGGED_IN = "Please run 'az login' to set up an account"


[docs]class AzureCliCredential(object): """Authenticates by requesting a token from the Azure CLI. This requires previously logging in to Azure via "az login", and will use the CLI's currently logged in identity. """ def __enter__(self): return self def __exit__(self, *args): pass
[docs] def close(self): # type: () -> None """Calling this method is unnecessary."""
[docs] @log_get_token("AzureCliCredential") def get_token(self, *scopes, **kwargs): # pylint: disable=no-self-use # type: (*str, **Any) -> AccessToken """Request an access token for `scopes`. This method is called automatically by Azure SDK clients. Applications calling this method directly must also handle token caching because this credential doesn't cache the tokens it acquires. :param str scopes: desired scope for the access token. This credential allows only one scope per request. :keyword str tenant_id: optional tenant to include in the token request. :rtype: :class:`azure.core.credentials.AccessToken` :raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke the Azure CLI. :raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked the Azure CLI but didn't receive an access token. """ resource = _scopes_to_resource(*scopes) command = COMMAND_LINE.format(resource) tenant = resolve_tenant("", **kwargs) if tenant: command += " --tenant " + tenant output = _run_command(command) token = parse_token(output) if not token: sanitized_output = sanitize_output(output) raise ClientAuthenticationError( message="Unexpected output from Azure CLI: '{}'. \n" "To mitigate this issue, please refer to the troubleshooting guidelines here at " "https://aka.ms/azsdk/python/identity/azclicredential/troubleshoot.".format(sanitized_output)) return token
def parse_token(output): """Parse output of 'az account get-access-token' to an AccessToken. In particular, convert the "expiresOn" value to epoch seconds. This value is a naive local datetime as returned by datetime.fromtimestamp. """ try: token = json.loads(output) dt = datetime.strptime(token["expiresOn"], "%Y-%m-%d %H:%M:%S.%f") if hasattr(dt, "timestamp"): # Python >= 3.3 expires_on = dt.timestamp() else: # taken from Python 3.5's datetime.timestamp() expires_on = time.mktime((dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, -1, -1, -1)) return AccessToken(token["accessToken"], int(expires_on)) except (KeyError, ValueError): return None def get_safe_working_dir(): """Invoke 'az' from a directory controlled by the OS, not the executing program's directory""" if sys.platform.startswith("win"): path = os.environ.get("SYSTEMROOT") if not path: raise CredentialUnavailableError(message="Environment variable 'SYSTEMROOT' has no value") return path return "/bin" def sanitize_output(output): """Redact access tokens from CLI output to prevent error messages revealing them""" return re.sub(r"\"accessToken\": \"(.*?)(\"|$)", "****", output) def _run_command(command): if sys.platform.startswith("win"): args = ["cmd", "/c", command] else: args = ["/bin/sh", "-c", command] try: working_directory = get_safe_working_dir() kwargs = { "stderr": subprocess.STDOUT, "cwd": working_directory, "universal_newlines": True, "env": dict(os.environ, AZURE_CORE_NO_COLOR="true"), } if platform.python_version() >= "3.3": kwargs["timeout"] = 10 return subprocess.check_output(args, **kwargs) except subprocess.CalledProcessError as ex: # non-zero return from shell if ex.returncode == 127 or ex.output.startswith("'az' is not recognized"): raise CredentialUnavailableError(message=CLI_NOT_FOUND) if "az login" in ex.output or "az account set" in ex.output: raise CredentialUnavailableError(message=NOT_LOGGED_IN) # return code is from the CLI -> propagate its output if ex.output: message = sanitize_output(ex.output) else: message = "Failed to invoke Azure CLI" raise ClientAuthenticationError(message=message) except OSError as ex: # failed to execute 'cmd' or '/bin/sh'; CLI may or may not be installed error = CredentialUnavailableError(message="Failed to execute '{}'".format(args[0])) six.raise_from(error, ex) except Exception as ex: # pylint:disable=broad-except # could be a timeout, for example error = CredentialUnavailableError(message="Failed to invoke the Azure CLI") six.raise_from(error, ex)