# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import asyncio
import sys
from typing import cast, TYPE_CHECKING
from .._internal import AsyncContextManager
from .._internal.decorators import log_get_token_async
from ... import CredentialUnavailableError
from ..._credentials.azure_powershell import (
AzurePowerShellCredential as _SyncCredential,
get_command_line,
get_safe_working_dir,
raise_for_error,
parse_token,
)
from ..._internal import resolve_tenant
if TYPE_CHECKING:
# pylint:disable=ungrouped-imports
from typing import Any, List
from azure.core.credentials import AccessToken
[docs]class AzurePowerShellCredential(AsyncContextManager):
"""Authenticates by requesting a token from Azure PowerShell.
This requires previously logging in to Azure via "Connect-AzAccount", and will use the currently logged in identity.
:keyword bool allow_multitenant_authentication: when True, enables the credential to acquire tokens from any tenant
the identity logged in to Azure PowerShell is registered in. When False, which is the default, the credential
will acquire tokens only from the tenant of Azure PowerShell's active subscription.
"""
def __init__(self, **kwargs: "Any") -> None:
self._allow_multitenant = kwargs.get("allow_multitenant_authentication", False)
[docs] @log_get_token_async
async def get_token(
self, *scopes: str, **kwargs: "Any"
) -> "AccessToken": # pylint:disable=no-self-use,unused-argument
"""Request an access token for `scopes`.
This method is called automatically by Azure SDK clients. Applications calling this method directly must
also handle token caching because this credential doesn't cache the tokens it acquires.
:param str scopes: desired scope for the access token. This credential allows only one scope per request.
:keyword str tenant_id: optional tenant to include in the token request. If **allow_multitenant_authentication**
is False, specifying a tenant with this argument may raise an exception.
:rtype: :class:`azure.core.credentials.AccessToken`
:raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke Azure PowerShell, or
no account is authenticated
:raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked Azure PowerShell but didn't
receive an access token
"""
# only ProactorEventLoop supports subprocesses on Windows (and it isn't the default loop on Python < 3.8)
if sys.platform.startswith("win") and not isinstance(asyncio.get_event_loop(), asyncio.ProactorEventLoop):
return _SyncCredential().get_token(*scopes, **kwargs)
tenant_id = resolve_tenant("", self._allow_multitenant, **kwargs)
command_line = get_command_line(scopes, tenant_id)
output = await run_command_line(command_line)
token = parse_token(output)
return token
[docs] async def close(self) -> None:
"""Calling this method is unnecessary"""
async def run_command_line(command_line: "List[str]") -> str:
try:
proc = await start_process(command_line)
stdout, stderr = await asyncio.wait_for(proc.communicate(), 10)
if sys.platform.startswith("win") and b"' is not recognized" in stderr:
# pwsh.exe isn't on the path; try powershell.exe
command_line[-1] = command_line[-1].replace("pwsh", "powershell", 1)
proc = await start_process(command_line)
stdout, stderr = await asyncio.wait_for(proc.communicate(), 10)
except OSError as ex:
# failed to execute "cmd" or "/bin/sh"; Azure PowerShell may or may not be installed
error = CredentialUnavailableError(message='Failed to execute "{}"'.format(command_line[0]))
raise error from ex
except asyncio.TimeoutError as ex:
proc.kill()
raise CredentialUnavailableError(message="Timed out waiting for Azure PowerShell") from ex
decoded_stdout = stdout.decode()
# casting because mypy infers Optional[int]; however, when proc.returncode is None,
# we handled TimeoutError above and therefore don't execute this line
raise_for_error(cast(int, proc.returncode), decoded_stdout, stderr.decode())
return decoded_stdout
async def start_process(command_line):
working_directory = get_safe_working_dir()
proc = await asyncio.create_subprocess_exec(
*command_line,
cwd=working_directory,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
return proc