Source code for azure.identity.aio._credentials.azure_powershell

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import asyncio
import sys
from typing import Any, cast, List, Optional
from azure.core.credentials import AccessToken

from .._internal import AsyncContextManager
from .._internal.decorators import log_get_token_async
from ... import CredentialUnavailableError
from ..._credentials.azure_powershell import (
    AzurePowerShellCredential as _SyncCredential,
    get_command_line,
    get_safe_working_dir,
    raise_for_error,
    parse_token,
)
from ..._internal import resolve_tenant, validate_tenant_id, validate_scope


[docs]class AzurePowerShellCredential(AsyncContextManager): """Authenticates by requesting a token from Azure PowerShell. This requires previously logging in to Azure via "Connect-AzAccount", and will use the currently logged in identity. :keyword str tenant_id: Optional tenant to include in the token request. :keyword List[str] additionally_allowed_tenants: Specifies tenants in addition to the specified "tenant_id" for which the credential may acquire tokens. Add the wildcard value "*" to allow the credential to acquire tokens for any tenant the application can access. :keyword int process_timeout: Seconds to wait for the Azure PowerShell process to respond. Defaults to 10 seconds. .. admonition:: Example: .. literalinclude:: ../samples/credential_creation_code_snippets.py :start-after: [START create_azure_power_shell_credential_async] :end-before: [END create_azure_power_shell_credential_async] :language: python :dedent: 4 :caption: Create an AzurePowerShellCredential. """ def __init__( self, *, tenant_id: str = "", additionally_allowed_tenants: Optional[List[str]] = None, process_timeout: int = 10, ) -> None: if tenant_id: validate_tenant_id(tenant_id) self.tenant_id = tenant_id self._additionally_allowed_tenants = additionally_allowed_tenants or [] self._process_timeout = process_timeout
[docs] @log_get_token_async async def get_token( self, *scopes: str, claims: Optional[str] = None, # pylint:disable=unused-argument tenant_id: Optional[str] = None, **kwargs: Any, ) -> AccessToken: """Request an access token for `scopes`. This method is called automatically by Azure SDK clients. Applications calling this method directly must also handle token caching because this credential doesn't cache the tokens it acquires. :param str scopes: desired scope for the access token. This credential allows only one scope per request. For more information about scopes, see https://learn.microsoft.com/entra/identity-platform/scopes-oidc. :keyword str claims: not used by this credential; any value provided will be ignored. :keyword str tenant_id: optional tenant to include in the token request. :return: An access token with the desired scopes. :rtype: ~azure.core.credentials.AccessToken :raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke Azure PowerShell, or no account is authenticated :raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked Azure PowerShell but didn't receive an access token """ # only ProactorEventLoop supports subprocesses on Windows (and it isn't the default loop on Python < 3.8) if sys.platform.startswith("win") and not isinstance(asyncio.get_event_loop(), asyncio.ProactorEventLoop): return _SyncCredential().get_token(*scopes, tenant_id=tenant_id, **kwargs) if tenant_id: validate_tenant_id(tenant_id) for scope in scopes: validate_scope(scope) tenant_id = resolve_tenant( default_tenant=self.tenant_id, tenant_id=tenant_id, additionally_allowed_tenants=self._additionally_allowed_tenants, **kwargs, ) command_line = get_command_line(scopes, tenant_id) output = await run_command_line(command_line, self._process_timeout) token = parse_token(output) return token
[docs] async def close(self) -> None: """Calling this method is unnecessary"""
async def run_command_line(command_line: List[str], timeout: int) -> str: try: proc = await start_process(command_line) stdout, stderr = await asyncio.wait_for(proc.communicate(), 10) if sys.platform.startswith("win") and (b"' is not recognized" in stderr or proc.returncode == 9009): # pwsh.exe isn't on the path; try powershell.exe command_line[-1] = command_line[-1].replace("pwsh", "powershell", 1) proc = await start_process(command_line) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout) except asyncio.TimeoutError as ex: proc.kill() raise CredentialUnavailableError( message="Timed out waiting for Azure PowerShell.\n" "To mitigate this issue, please refer to the troubleshooting guidelines here at " "https://aka.ms/azsdk/python/identity/powershellcredential/troubleshoot." ) from ex except OSError as ex: # failed to execute "cmd" or "/bin/sh"; Azure PowerShell may or may not be installed error = CredentialUnavailableError( message='Failed to execute "{}".\n' "To mitigate this issue, please refer to the troubleshooting guidelines here at " "https://aka.ms/azsdk/python/identity/powershellcredential/troubleshoot.".format(command_line[0]) ) raise error from ex decoded_stdout = stdout.decode() # casting because mypy infers Optional[int]; however, when proc.returncode is None, # we handled TimeoutError above and therefore don't execute this line raise_for_error(cast(int, proc.returncode), decoded_stdout, stderr.decode()) return decoded_stdout async def start_process(command_line): working_directory = get_safe_working_dir() proc = await asyncio.create_subprocess_exec( *command_line, cwd=working_directory, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, ) return proc