Source code for azure.keyvault.keys.crypto._client

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import logging
from typing import TYPE_CHECKING

import six
from azure.core.exceptions import HttpResponseError
from azure.core.tracing.decorator import distributed_trace

from . import DecryptResult, EncryptResult, SignResult, VerifyResult, UnwrapResult, WrapResult
from ._key_validity import raise_if_time_invalid
from ._providers import get_local_cryptography_provider, NoLocalCryptography
from .. import KeyOperation
from .._models import KeyVaultKey
from .._shared import KeyVaultClientBase, parse_vault_id

if TYPE_CHECKING:
    # pylint:disable=unused-import
    from typing import Any, Optional, Union
    from azure.core.credentials import TokenCredential
    from . import EncryptionAlgorithm, KeyWrapAlgorithm, SignatureAlgorithm

_LOGGER = logging.getLogger(__name__)


[docs]class CryptographyClient(KeyVaultClientBase): """Performs cryptographic operations using Azure Key Vault keys. :param key: Either a :class:`~azure.keyvault.keys.KeyVaultKey` instance as returned by :func:`~azure.keyvault.keys.KeyClient.get_key`, or a string. If a string, the value must be the full identifier of an Azure Key Vault key with a version. :type key: str or :class:`~azure.keyvault.keys.KeyVaultKey` :param credential: An object which can provide an access token for the vault, such as a credential from :mod:`azure.identity` :keyword api_version: version of the Key Vault API to use. Defaults to the most recent. :paramtype api_version: ~azure.keyvault.keys.ApiVersion :keyword transport: transport to use. Defaults to :class:`~azure.core.pipeline.transport.RequestsTransport`. :paramtype transport: ~azure.core.pipeline.transport.HttpTransport .. literalinclude:: ../tests/test_examples_crypto.py :start-after: [START create_client] :end-before: [END create_client] :caption: Create a CryptographyClient :language: python :dedent: 8 """ def __init__(self, key, credential, **kwargs): # type: (Union[KeyVaultKey, str], TokenCredential, **Any) -> None if isinstance(key, KeyVaultKey): self._key = key self._key_id = parse_vault_id(key.id) elif isinstance(key, six.string_types): self._key = None self._key_id = parse_vault_id(key) self._keys_get_forbidden = None # type: Optional[bool] else: raise ValueError("'key' must be a KeyVaultKey instance or a key ID string including a version") if not self._key_id.version: raise ValueError("'key' must include a version") self._local_provider = NoLocalCryptography() self._initialized = False super(CryptographyClient, self).__init__(vault_url=self._key_id.vault_url, credential=credential, **kwargs) @property def key_id(self): # type: () -> str """The full identifier of the client's key. :rtype: str """ return "/".join(self._key_id) @distributed_trace def _initialize(self, **kwargs): # type: (**Any) -> None if self._initialized: return # try to get the key material, if we don't have it and aren't forbidden to do so if not (self._key or self._keys_get_forbidden): try: self._key = self._client.get_key( self._key_id.vault_url, self._key_id.name, self._key_id.version, **kwargs ) except HttpResponseError as ex: # if we got a 403, we don't have keys/get permission and won't try to get the key again # (other errors may be transient) self._keys_get_forbidden = ex.status_code == 403 # if we have the key material, create a local crypto provider with it if self._key: self._local_provider = get_local_cryptography_provider(self._key) self._initialized = True else: # try to get the key again next time unless we know we're forbidden to do so self._initialized = self._keys_get_forbidden
[docs] @distributed_trace def encrypt(self, algorithm, plaintext, **kwargs): # type: (EncryptionAlgorithm, bytes, **Any) -> EncryptResult """Encrypt bytes using the client's key. Requires the keys/encrypt permission. This method encrypts only a single block of data, whose size depends on the key and encryption algorithm. :param algorithm: encryption algorithm to use :type algorithm: :class:`~azure.keyvault.keys.crypto.EncryptionAlgorithm` :param bytes plaintext: bytes to encrypt :rtype: :class:`~azure.keyvault.keys.crypto.EncryptResult` .. literalinclude:: ../tests/test_examples_crypto.py :start-after: [START encrypt] :end-before: [END encrypt] :caption: Encrypt bytes :language: python :dedent: 8 """ self._initialize(**kwargs) if self._local_provider.supports(KeyOperation.encrypt, algorithm): raise_if_time_invalid(self._key) try: return self._local_provider.encrypt(algorithm, plaintext) except Exception as ex: # pylint:disable=broad-except _LOGGER.warning("Local encrypt operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG)) operation_result = self._client.encrypt( vault_base_url=self._key_id.vault_url, key_name=self._key_id.name, key_version=self._key_id.version, parameters=self._models.KeyOperationsParameters(algorithm=algorithm, value=plaintext), **kwargs ) return EncryptResult(key_id=self.key_id, algorithm=algorithm, ciphertext=operation_result.result)
[docs] @distributed_trace def decrypt(self, algorithm, ciphertext, **kwargs): # type: (EncryptionAlgorithm, bytes, **Any) -> DecryptResult """Decrypt a single block of encrypted data using the client's key. Requires the keys/decrypt permission. This method decrypts only a single block of data, whose size depends on the key and encryption algorithm. :param algorithm: encryption algorithm to use :type algorithm: :class:`~azure.keyvault.keys.crypto.EncryptionAlgorithm` :param bytes ciphertext: encrypted bytes to decrypt :rtype: :class:`~azure.keyvault.keys.crypto.DecryptResult` .. literalinclude:: ../tests/test_examples_crypto.py :start-after: [START decrypt] :end-before: [END decrypt] :caption: Decrypt bytes :language: python :dedent: 8 """ self._initialize(**kwargs) if self._local_provider.supports(KeyOperation.decrypt, algorithm): try: return self._local_provider.decrypt(algorithm, ciphertext) except Exception as ex: # pylint:disable=broad-except _LOGGER.warning("Local decrypt operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG)) operation_result = self._client.decrypt( vault_base_url=self._key_id.vault_url, key_name=self._key_id.name, key_version=self._key_id.version, parameters=self._models.KeyOperationsParameters(algorithm=algorithm, value=ciphertext), **kwargs ) return DecryptResult(key_id=self.key_id, algorithm=algorithm, plaintext=operation_result.result)
[docs] @distributed_trace def wrap_key(self, algorithm, key, **kwargs): # type: (KeyWrapAlgorithm, bytes, **Any) -> WrapResult """Wrap a key with the client's key. Requires the keys/wrapKey permission. :param algorithm: wrapping algorithm to use :type algorithm: :class:`~azure.keyvault.keys.crypto.KeyWrapAlgorithm` :param bytes key: key to wrap :rtype: :class:`~azure.keyvault.keys.crypto.WrapResult` .. literalinclude:: ../tests/test_examples_crypto.py :start-after: [START wrap_key] :end-before: [END wrap_key] :caption: Wrap a key :language: python :dedent: 8 """ self._initialize(**kwargs) if self._local_provider.supports(KeyOperation.wrap_key, algorithm): raise_if_time_invalid(self._key) try: return self._local_provider.wrap_key(algorithm, key) except Exception as ex: # pylint:disable=broad-except _LOGGER.warning("Local wrap operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG)) operation_result = self._client.wrap_key( vault_base_url=self._key_id.vault_url, key_name=self._key_id.name, key_version=self._key_id.version, parameters=self._models.KeyOperationsParameters(algorithm=algorithm, value=key), **kwargs ) return WrapResult(key_id=self.key_id, algorithm=algorithm, encrypted_key=operation_result.result)
[docs] @distributed_trace def unwrap_key(self, algorithm, encrypted_key, **kwargs): # type: (KeyWrapAlgorithm, bytes, **Any) -> UnwrapResult """Unwrap a key previously wrapped with the client's key. Requires the keys/unwrapKey permission. :param algorithm: wrapping algorithm to use :type algorithm: :class:`~azure.keyvault.keys.crypto.KeyWrapAlgorithm` :param bytes encrypted_key: the wrapped key :rtype: :class:`~azure.keyvault.keys.crypto.UnwrapResult` .. literalinclude:: ../tests/test_examples_crypto.py :start-after: [START unwrap_key] :end-before: [END unwrap_key] :caption: Unwrap a key :language: python :dedent: 8 """ self._initialize(**kwargs) if self._local_provider.supports(KeyOperation.unwrap_key, algorithm): try: return self._local_provider.unwrap_key(algorithm, encrypted_key) except Exception as ex: # pylint:disable=broad-except _LOGGER.warning("Local unwrap operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG)) operation_result = self._client.unwrap_key( vault_base_url=self._key_id.vault_url, key_name=self._key_id.name, key_version=self._key_id.version, parameters=self._models.KeyOperationsParameters(algorithm=algorithm, value=encrypted_key), **kwargs ) return UnwrapResult(key_id=self._key_id, algorithm=algorithm, key=operation_result.result)
[docs] @distributed_trace def sign(self, algorithm, digest, **kwargs): # type: (SignatureAlgorithm, bytes, **Any) -> SignResult """Create a signature from a digest using the client's key. Requires the keys/sign permission. :param algorithm: signing algorithm :type algorithm: :class:`~azure.keyvault.keys.crypto.SignatureAlgorithm` :param bytes digest: hashed bytes to sign :rtype: :class:`~azure.keyvault.keys.crypto.SignResult` .. literalinclude:: ../tests/test_examples_crypto.py :start-after: [START sign] :end-before: [END sign] :caption: Sign bytes :language: python :dedent: 8 """ self._initialize(**kwargs) if self._local_provider.supports(KeyOperation.sign, algorithm): raise_if_time_invalid(self._key) try: return self._local_provider.sign(algorithm, digest) except Exception as ex: # pylint:disable=broad-except _LOGGER.warning("Local sign operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG)) operation_result = self._client.sign( vault_base_url=self._key_id.vault_url, key_name=self._key_id.name, key_version=self._key_id.version, parameters=self._models.KeySignParameters(algorithm=algorithm, value=digest), **kwargs ) return SignResult(key_id=self.key_id, algorithm=algorithm, signature=operation_result.result)
[docs] @distributed_trace def verify(self, algorithm, digest, signature, **kwargs): # type: (SignatureAlgorithm, bytes, bytes, **Any) -> VerifyResult """Verify a signature using the client's key. Requires the keys/verify permission. :param algorithm: verification algorithm :type algorithm: :class:`~azure.keyvault.keys.crypto.SignatureAlgorithm` :param bytes digest: Pre-hashed digest corresponding to **signature**. The hash algorithm used must be compatible with **algorithm**. :param bytes signature: signature to verify :rtype: :class:`~azure.keyvault.keys.crypto.VerifyResult` .. literalinclude:: ../tests/test_examples_crypto.py :start-after: [START verify] :end-before: [END verify] :caption: Verify a signature :language: python :dedent: 8 """ self._initialize(**kwargs) if self._local_provider.supports(KeyOperation.verify, algorithm): try: return self._local_provider.verify(algorithm, digest, signature) except Exception as ex: # pylint:disable=broad-except _LOGGER.warning("Local verify operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG)) operation_result = self._client.verify( vault_base_url=self._key_id.vault_url, key_name=self._key_id.name, key_version=self._key_id.version, parameters=self._models.KeyVerifyParameters(algorithm=algorithm, digest=digest, signature=signature), **kwargs ) return VerifyResult(key_id=self.key_id, algorithm=algorithm, is_valid=operation_result.value)