Source code for azure.keyvault.keys.crypto._client

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from datetime import datetime, timedelta, tzinfo

import six
from azure.core.exceptions import AzureError, HttpResponseError
from azure.core.tracing.decorator import distributed_trace

from . import DecryptResult, EncryptResult, SignResult, VerifyResult, UnwrapResult, WrapResult
from ._internal import EllipticCurveKey, RsaKey, SymmetricKey
from .._models import KeyVaultKey
from .._shared import KeyVaultClientBase, parse_vault_id

try:
    from typing import TYPE_CHECKING
except ImportError:
    TYPE_CHECKING = False

if TYPE_CHECKING:
    # pylint:disable=unused-import
    from typing import Any, Optional, Union
    from azure.core.credentials import TokenCredential
    from . import EncryptionAlgorithm, KeyWrapAlgorithm, SignatureAlgorithm
    from ._internal import Key as _Key


class _UTC_TZ(tzinfo):
    """from https://docs.python.org/2/library/datetime.html#tzinfo-objects"""

    ZERO = timedelta(0)

    def utcoffset(self, dt):
        return self.ZERO

    def tzname(self, dt):
        return "UTC"

    def dst(self, dt):
        return self.ZERO


_UTC = _UTC_TZ()


def _enforce_nbf_exp(key):
    # type: (KeyVaultKey) -> None
    try:
        nbf = key.properties.not_before
        exp = key.properties.expires_on
    except AttributeError:
        # we consider the key valid because a user must have deliberately created it
        # (if it came from Key Vault, it would have those attributes)
        return

    now = datetime.now(_UTC)
    if (nbf and exp) and not nbf <= now <= exp:
        raise ValueError("This client's key is useable only between {} and {} (UTC)".format(nbf, exp))
    if nbf and nbf >= now:
        raise ValueError("This client's key is not useable until {} (UTC)".format(nbf))
    if exp and exp <= now:
        raise ValueError("This client's key expired at {} (UTC)".format(exp))


[docs]class CryptographyClient(KeyVaultClientBase): """Performs cryptographic operations using Azure Key Vault keys. :param key: Either a :class:`~azure.keyvault.keys.KeyVaultKey` instance as returned by :func:`~azure.keyvault.keys.KeyClient.get_key`, or a string. If a string, the value must be the full identifier of an Azure Key Vault key with a version. :type key: str or :class:`~azure.keyvault.keys.KeyVaultKey` :param credential: An object which can provide an access token for the vault, such as a credential from :mod:`azure.identity` :keyword api_version: version of the Key Vault API to use. Defaults to the most recent. :paramtype api_version: ~azure.keyvault.keys.ApiVersion :keyword transport: transport to use. Defaults to :class:`~azure.core.pipeline.transport.RequestsTransport`. :paramtype transport: ~azure.core.pipeline.transport.HttpTransport Creating a ``CryptographyClient``: .. code-block:: python from azure.identity import DefaultAzureCredential from azure.keyvault.keys.crypto import CryptographyClient credential = DefaultAzureCredential() # create a CryptographyClient using a KeyVaultKey instance key = key_client.get_key("mykey") crypto_client = CryptographyClient(key, credential) # or a key's id, which must include a version key_id = "https://<your vault>.vault.azure.net/keys/mykey/fe4fdcab688c479a9aa80f01ffeac26" crypto_client = CryptographyClient(key_id, credential) """ def __init__(self, key, credential, **kwargs): # type: (Union[KeyVaultKey, str], TokenCredential, **Any) -> None if isinstance(key, KeyVaultKey): self._key = key self._key_id = parse_vault_id(key.id) self._allowed_ops = frozenset(self._key.key_operations) elif isinstance(key, six.string_types): self._key = None self._key_id = parse_vault_id(key) self._keys_get_forbidden = None # type: Optional[bool] # will be replaced with actual permissions before any local operations are attempted, if we can get the key self._allowed_ops = frozenset() else: raise ValueError("'key' must be a KeyVaultKey instance or a key ID string including a version") if not self._key_id.version: raise ValueError("'key' must include a version") self._internal_key = None # type: Optional[_Key] super(CryptographyClient, self).__init__(vault_url=self._key_id.vault_url, credential=credential, **kwargs) @property def key_id(self): # type: () -> str """The full identifier of the client's key. :rtype: str """ return "/".join(self._key_id) @distributed_trace def _get_key(self, **kwargs): # type: (**Any) -> Optional[KeyVaultKey] """Get the client's :class:`~azure.keyvault.keys.KeyVaultKey`. Can be ``None``, if the client lacks keys/get permission. :rtype: :class:`~azure.keyvault.keys.KeyVaultKey` or ``None`` """ if not (self._key or self._keys_get_forbidden): try: self._key = self._client.get_key( self._key_id.vault_url, self._key_id.name, self._key_id.version, **kwargs ) self._allowed_ops = frozenset(self._key.key_operations) except HttpResponseError as ex: # if we got a 403, we don't have keys/get permission and won't try to get the key again # (other errors may be transient) self._keys_get_forbidden = ex.status_code == 403 return self._key def _get_local_key(self, **kwargs): # type: (**Any) -> Optional[_Key] """Gets an object implementing local operations. Will be ``None``, if the client was instantiated with a key id and lacks keys/get permission.""" if not self._internal_key: key = self._get_key(**kwargs) if not key: return None kty = key.key_type.lower() if kty.startswith("ec"): self._internal_key = EllipticCurveKey.from_jwk(key.key) elif kty.startswith("rsa"): self._internal_key = RsaKey.from_jwk(key.key) elif kty == "oct": self._internal_key = SymmetricKey.from_jwk(key.key) else: raise ValueError("Unsupported key type '{}'".format(key.key_type)) return self._internal_key
[docs] @distributed_trace def encrypt(self, algorithm, plaintext, **kwargs): # type: (EncryptionAlgorithm, bytes, **Any) -> EncryptResult """Encrypt bytes using the client's key. Requires the keys/encrypt permission. This method encrypts only a single block of data, whose size depends on the key and encryption algorithm. :param algorithm: encryption algorithm to use :type algorithm: :class:`~azure.keyvault.keys.crypto.EncryptionAlgorithm` :param bytes plaintext: bytes to encrypt :rtype: :class:`~azure.keyvault.keys.crypto.EncryptResult` Example: .. code-block:: python from azure.keyvault.keys.crypto import EncryptionAlgorithm # the result holds the ciphertext and identifies the encryption key and algorithm used result = client.encrypt(EncryptionAlgorithm.rsa_oaep, b"plaintext") ciphertext = result.ciphertext print(result.key_id) print(result.algorithm) """ local_key = self._get_local_key(**kwargs) if local_key: _enforce_nbf_exp(self._key) if "encrypt" not in self._allowed_ops: raise AzureError("This client doesn't have 'keys/encrypt' permission") result = local_key.encrypt(plaintext, algorithm=algorithm.value) else: parameters = self._models.KeyOperationsParameters( algorithm=algorithm, value=plaintext ) result = self._client.encrypt( vault_base_url=self._key_id.vault_url, key_name=self._key_id.name, key_version=self._key_id.version, parameters=parameters, **kwargs ).result return EncryptResult(key_id=self.key_id, algorithm=algorithm, ciphertext=result)
[docs] @distributed_trace def decrypt(self, algorithm, ciphertext, **kwargs): # type: (EncryptionAlgorithm, bytes, **Any) -> DecryptResult """Decrypt a single block of encrypted data using the client's key. Requires the keys/decrypt permission. This method decrypts only a single block of data, whose size depends on the key and encryption algorithm. :param algorithm: encryption algorithm to use :type algorithm: :class:`~azure.keyvault.keys.crypto.EncryptionAlgorithm` :param bytes ciphertext: encrypted bytes to decrypt :rtype: :class:`~azure.keyvault.keys.crypto.DecryptResult` Example: .. code-block:: python from azure.keyvault.keys.crypto import EncryptionAlgorithm result = client.decrypt(EncryptionAlgorithm.rsa_oaep, ciphertext) print(result.plaintext) """ parameters = self._models.KeyOperationsParameters( algorithm=algorithm, value=ciphertext ) result = self._client.decrypt( vault_base_url=self._key_id.vault_url, key_name=self._key_id.name, key_version=self._key_id.version, parameters=parameters, **kwargs ) return DecryptResult(key_id=self.key_id, algorithm=algorithm, plaintext=result.result)
[docs] @distributed_trace def wrap_key(self, algorithm, key, **kwargs): # type: (KeyWrapAlgorithm, bytes, **Any) -> WrapResult """Wrap a key with the client's key. Requires the keys/wrapKey permission. :param algorithm: wrapping algorithm to use :type algorithm: :class:`~azure.keyvault.keys.crypto.KeyWrapAlgorithm` :param bytes key: key to wrap :rtype: :class:`~azure.keyvault.keys.crypto.WrapResult` Example: .. code-block:: python from azure.keyvault.keys.crypto import KeyWrapAlgorithm # the result holds the encrypted key and identifies the encryption key and algorithm used result = client.wrap_key(KeyWrapAlgorithm.rsa_oaep, key_bytes) encrypted_key = result.encrypted_key print(result.key_id) print(result.algorithm) """ local_key = self._get_local_key(**kwargs) if local_key: _enforce_nbf_exp(self._key) if "wrapKey" not in self._allowed_ops: raise AzureError("This client doesn't have 'keys/wrapKey' permission") result = local_key.wrap_key(key, algorithm=algorithm.value) else: parameters = self._models.KeyOperationsParameters( algorithm=algorithm, value=key, ) result = self._client.wrap_key( self._key_id.vault_url, self._key_id.name, self._key_id.version, parameters=parameters ).result return WrapResult(key_id=self.key_id, algorithm=algorithm, encrypted_key=result)
[docs] @distributed_trace def unwrap_key(self, algorithm, encrypted_key, **kwargs): # type: (KeyWrapAlgorithm, bytes, **Any) -> UnwrapResult """Unwrap a key previously wrapped with the client's key. Requires the keys/unwrapKey permission. :param algorithm: wrapping algorithm to use :type algorithm: :class:`~azure.keyvault.keys.crypto.KeyWrapAlgorithm` :param bytes encrypted_key: the wrapped key :rtype: :class:`~azure.keyvault.keys.crypto.UnwrapResult` Example: .. code-block:: python from azure.keyvault.keys.crypto import KeyWrapAlgorithm result = client.unwrap_key(KeyWrapAlgorithm.rsa_oaep, wrapped_bytes) key = result.key """ local_key = self._get_local_key(**kwargs) if local_key and local_key.is_private_key(): if "unwrapKey" not in self._allowed_ops: raise AzureError("This client doesn't have 'keys/unwrapKey' permission") result = local_key.unwrap_key(encrypted_key, **kwargs) else: parameters = self._models.KeyOperationsParameters( algorithm=algorithm, value=encrypted_key ) result = self._client.unwrap_key( vault_base_url=self._key_id.vault_url, key_name=self._key_id.name, key_version=self._key_id.version, parameters=parameters, **kwargs ).result return UnwrapResult(key_id=self._key_id, algorithm=algorithm, key=result)
[docs] @distributed_trace def sign(self, algorithm, digest, **kwargs): # type: (SignatureAlgorithm, bytes, **Any) -> SignResult """Create a signature from a digest using the client's key. Requires the keys/sign permission. :param algorithm: signing algorithm :type algorithm: :class:`~azure.keyvault.keys.crypto.SignatureAlgorithm` :param bytes digest: hashed bytes to sign :rtype: :class:`~azure.keyvault.keys.crypto.SignResult` Example: .. code-block:: python import hashlib from azure.keyvault.keys.crypto import SignatureAlgorithm digest = hashlib.sha256(b"plaintext").digest() # sign returns a tuple with the signature and the metadata required to verify it result = client.sign(SignatureAlgorithm.rs256, digest) # the result contains the signature and identifies the key and algorithm used signature = result.signature print(result.key_id) print(result.algorithm) """ parameters = self._models.KeySignParameters( algorithm=algorithm, value=digest ) result = self._client.sign( vault_base_url=self._key_id.vault_url, key_name=self._key_id.name, key_version=self._key_id.version, parameters=parameters, **kwargs ) return SignResult(key_id=self.key_id, algorithm=algorithm, signature=result.result)
[docs] @distributed_trace def verify(self, algorithm, digest, signature, **kwargs): # type: (SignatureAlgorithm, bytes, bytes, **Any) -> VerifyResult """Verify a signature using the client's key. Requires the keys/verify permission. :param algorithm: verification algorithm :type algorithm: :class:`~azure.keyvault.keys.crypto.SignatureAlgorithm` :param bytes digest: Pre-hashed digest corresponding to **signature**. The hash algorithm used must be compatible with **algorithm**. :param bytes signature: signature to verify :rtype: :class:`~azure.keyvault.keys.crypto.VerifyResult` Example: .. code-block:: python from azure.keyvault.keys.crypto import SignatureAlgorithm verified = client.verify(SignatureAlgorithm.rs256, digest, signature) assert verified.is_valid """ local_key = self._get_local_key(**kwargs) if local_key: if "verify" not in self._allowed_ops: raise AzureError("This client doesn't have 'keys/verify' permission") result = local_key.verify(digest, signature, algorithm=algorithm.value) else: parameters = self._models.KeyVerifyParameters( algorithm=algorithm, digest=digest, signature=signature ) result = self._client.verify( vault_base_url=self._key_id.vault_url, key_name=self._key_id.name, key_version=self._key_id.version, parameters=parameters, **kwargs ).value return VerifyResult(key_id=self.key_id, algorithm=algorithm, is_valid=result)