# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------
import base64
import json
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.hashes import SHA256
from ._common import Base64Url
from ._generated.models import (
PolicyResult as GeneratedPolicyResult,
AttestationResult as GeneratedAttestationResult,
StoredAttestationPolicy as GeneratedStoredAttestationPolicy,
JSONWebKey,
CertificateModification,
AttestationType,
PolicyModification
)
from typing import Any, Callable, Dict, List, Type, TypeVar, Generic, Union
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.x509 import Certificate, load_der_x509_certificate
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey
from json import JSONDecoder, JSONEncoder
from datetime import datetime
T = TypeVar('T')
[docs]class AttestationSigner(object):
""" Represents a signing certificate returned by the Attestation Service.
:param certificates: A list of Base64 encoded X.509
Certificates representing an X.509 certificate chain. The first of these
certificates will be used to sign an :class:`AttestationToken`.
:type certificates: list[bytes]
:param str key_id: A string which identifies a signing key, See
`RFC 7517 Section 4.5 <https://tools.ietf.org/html/rfc7517#section-4.5>`_
"""
def __init__(self, certificates, key_id, **kwargs):
# type: (list[bytes], str, Any) -> None
self.certificates = [base64.b64decode(cert) for cert in certificates]
self.key_id = key_id
@classmethod
def _from_generated(cls, generated):
#type:(JSONWebKey)->AttestationSigner
if not generated:
return cls
return cls(generated.x5_c, generated.kid)
[docs]class PolicyCertificatesModificationResult(object):
"""The result of a policy certificate modification.
:param str certificate_thumbprint: Hex encoded SHA1 Hash of the binary representation certificate
which was added or removed.
:param certificate_resolution: The result of the operation. Possible values include:
"IsPresent", "IsAbsent".
:type certificate_resolution: str or
~azure.security.attestation._generated.models.CertificateModification
"""
def __init__(self, certificate_thumbprint, certificate_resolution):
#type:(str, CertificateModification)->None
self.certificate_thumbprint = certificate_thumbprint
self.certificate_resolution = certificate_resolution
@classmethod
def _from_generated(cls, generated):
if not generated:
return cls
return cls(generated.certificate_thumbprint, generated.certificate_resolution)
[docs]class PolicyResult(object):
""" PolicyResult represents the result of a :meth:`azure.security.attestation.AttestationAdministrationClient.set_policy`
or :meth:`azure.security.attestation.AttestationAdministrationClient.reset_policy` API call.
The `PolicyResult` class is returned as the body of an attestation token from
the attestation service. It can be used to ensure that the attestation service
received the policy object sent from the client without alteration.
:param policy_resolution: The result of the policy set or
reset call.
:type policy_resolution: azure.security.attestation.PolicyModification
:param policy_signer: If the call to `set_policy` or `reset_policy`
had a :class:`AttestationSigningKey` parameter, this will be the certificate
which was specified in this parameter.
:type policy_signer: azure.security.attestation.AttestationSigner
:param str policy_token_hash: The hash of the complete JSON Web Signature
presented to the `set_policy` or `reset_policy` API.
"""
def __init__(self, policy_resolution, policy_signer, policy_token_hash):
#type:(PolicyModification, JSONWebKey, str) -> None
self.policy_resolution = policy_resolution
self.policy_signer = AttestationSigner._from_generated(policy_signer)
self.policy_token_hash = policy_token_hash
@classmethod
def _from_generated(cls, generated):
#type:(GeneratedPolicyResult)->PolicyResult
if not generated:
return cls(None, None, None)
return cls(generated.policy_resolution, generated.policy_signer, generated.policy_token_hash)
[docs]class AttestationResult(object):
""" An AttestationResult represents the claims returned from the attestation
service as a result of a call to
:meth:`azure.security.attestation.AttestationClient.attest_sgx`, or :meth:`AttestationClient.attest_open_enclave`.
"""
def __init__(self, **kwargs):
#type:(Dict[str,Any])->None
"""
:keyword issuer: Entity which issued the attestation token.
:paramtype issuer: str
:keyword confirmation: Confirmation claim for the token.
:paramtype confirmation: dict
:keyword unique_identifier: Unique identifier for the token.
:paramtype unique_identifier: str
:keyword nonce: Returns the input `nonce` attribute passed to the `attest` API.
:paramtype nonce: str
:keyword version: Version of the token. Must be "1.0"
:paramtype version: str
:keyword runtime_claims: Runtime claims passed in from the caller of the attest API.
:paramtype runtime_claims: dict
:keyword inittime_claims: Inittime claims passed in from the caller of the attest API.
:paramtype inittime_claims: dict
:keyword enclave_held_data: Runtime data passed in from the caller of the attest API.
:paramtype enclave_held_data: bytes
:keyword policy_claims: Attestation claims issued by policies.
:paramtype policy_claims: dict
:keyword verifier_type: Verifier which generated this token.
:paramtype verifier_type: str
:keyword policy_signer: If the policy which processed the request is signed,
this will be the certificate which signed the policy.
:paramtype policy_signer: azure.security.attestation.AttestationSigner
:keyword policy_hash: The hash of the policy which processed the attestation
evidence.
:paramtype policy_hash: str
:keyword is_debuggable: True if the SGX enclave being attested is debuggable.
:paramtype is_debuggable: bool
:keyword product_id: Product ID for the SGX enclave being attested.
:paramtype product_id: int
:keyword mr_enclave: MRENCLAVE value for the SGX enclave being attested.
:paramtype mr_enclave: str
:keyword mr_signer: MRSIGNER value for the SGX enclave being attested.
:paramtype mr_signer: str
:keyword svn: Security version number for the SGX enclave being attested.
:paramtype svn: int
:keyword sgx_collateral: Collateral which identifies the collateral used to
create the token.
:paramtype sgx_collateral: dict
"""
self._issuer = kwargs.pop("issuer", None) #type:Union[str, None]
self._confirmation = kwargs.pop("confirmation", None) #type:Union[dict, None]
self._unique_identifier = kwargs.pop("unique_identifier", None) #type:Union[str, None]
self._nonce = kwargs.pop("nonce", None) #type:Union[str, None]
self._version = kwargs.pop("version", None) #type:Union[str, None]
self._runtime_claims = kwargs.pop("runtime_claims", None) #type:Union[dict, None]
self._inittime_claims = kwargs.pop("inittime_claims", None) #type:Union[dict, None]
self._policy_claims = kwargs.pop("policy_claims", None) #type:Union[dict, None]
self._verifier_type = kwargs.pop("verifier_type", None) #type:Union[str. None]
self._policy_signer = kwargs.pop("policy_signer", None) #type:Union[AttestationSigner, None]
self._policy_hash = kwargs.pop("policy_hash", None) #type:Union[str, None]
self._is_debuggable = kwargs.pop("is_debuggable", None) #type:Union[bool, None]
self._product_id = kwargs.pop("product_id", None) #type:Union[int, None]
self._mr_enclave = kwargs.pop("mr_enclave", None) #type:Union[str, None]
self._mr_signer = kwargs.pop("mr_signer", None) #type:Union[str, None]
self._svn = kwargs.pop("svn", None) #type:Union[int, None]
self._enclave_held_data = kwargs.pop("enclave_held_data", None) #type:Union[bytes, None]
self._sgx_collateral = kwargs.pop("sgx_collateral", None) #type:Union[dict, None]
@classmethod
def _from_generated(cls, generated):
#type:(GeneratedAttestationResult) -> AttestationResult
return AttestationResult(
issuer=generated.iss,
confirmation=generated.cnf,
unique_identifier=generated.jti,
nonce=generated.nonce,
version=generated.version,
runtime_claims=generated.runtime_claims,
inittime_claims=generated.inittime_claims,
policy_claims=generated.policy_claims,
verifier_type=generated.verifier_type,
policy_signer=AttestationSigner._from_generated(generated.policy_signer) if generated.policy_signer else None,
policy_hash=generated.policy_hash,
is_debuggable=generated.is_debuggable,
product_id=generated.product_id,
mr_enclave=generated.mr_enclave,
mr_signer=generated.mr_signer,
svn=generated.svn,
enclave_held_data=generated.enclave_held_data,
sgx_collateral=generated.sgx_collateral)
@property
def issuer(self):
#type:() -> Union[str, None]
""" Returns the issuer of the attestation token.
The issuer for the token MUST be the same as the `instance_uri` associated
with the :class:`azure.security.attestation.AttestationClient` object.
If it is not, then the token should be rejected.
See `RFC 7519 Section 4.1.1 <https://www.rfc-editor.org/rfc/rfc7519.html#section-4.1.1>`_ for details.
:rtype: str or None
"""
return self._issuer
@property
def confirmation(self):
#type:() -> Union[str, None]
""" Returns the confirmation claim for the attestation token.
If present, the confirmation property can be used to identify a proof of
possession of a key.
See `RFC 7800 Section 3.1 <https://www.rfc-editor.org/rfc/rfc7800.html#section-3.1>`_ for details.
:rtype: str or None
"""
return self._confirmation
@property
def unique_id(self):
#type:() -> Union[str, None]
""" Returns a unique ID claim for the attestation token.
If present, the unique_id property can be used to distinguish between
different attestation tokens.
See `RFC 7519 Section 4.1.7 <https://rfc-editor.org/rfc/rfc7519.html#section-4.1.7>`_ for details.
:rtype: str or None
"""
return self._confirmation
@property
def nonce(self):
#type:() -> Union[str, None]
""" Returns the value of the "nonce" input to the attestation request.
:rtype: str or None
"""
return self._nonce
@property
def version(self):
#type:() -> Union[str, None]
""" Returns the version of the information returned in the token.
:rtype: str or None
"""
return self._version
@property
def runtime_claims(self):
#type:() -> Dict[str, Any]
""" Returns the runtime claims in the token.
This value will match the input `runtime_data` property to the
:meth:`azure.security.attestation.AttestationClient.attest_sgx` or
:meth:`azure.security.attestation.AttestationClient.attest_open_enclave` API.
:rtype: dict[str, Any] or None
.. note:: The `runtime_claims` property will only be populated if the
`runtime_data` parameter to the `Attest` API is marked as being JSON.
"""
return self._runtime_claims
@property
def inittime_claims(self):
#type:() -> Dict[str, Any]
""" Returns the inittime claims in the token.
This value will match the input `inittime_data` property to the
:meth:`azure.security.attestation.AttestationClient.attest_sgx` or
:meth:`azure.security.attestation.AttestationClient.attest_open_enclave` API.
:rtype: dict[str, Any] or None
.. note:: The `inittime_claims` property will only be populated if the
`inittime_data` parameter to the `Attest` API is marked as being JSON.
"""
return self._inittime_claims
@property
def policy_claims(self):
#type:() -> Dict[str, Any]
""" Returns the claims for the token generated by attestation policy.
:rtype: dict[str, Any] or None
"""
return self._policy_claims
@property
def verifier_type(self):
#type:() -> Union[str, None]
""" Returns the verifier which generated this attestation token.
:rtype: str or None
"""
return self._verifier_type
@property
def policy_signer(self):
#type:() -> Union[AttestationSigner, None]
""" Returns the signing certificate which was used to sign the policy
which was applied when the token was generated.
:rtype: azure.security.attestation.AttestationSigner or None
"""
if self._policy_signer:
return AttestationSigner._from_generated(self._policy_signer)
return None
@property
def policy_hash(self):
#type:() -> Union[str, None]
""" Returns the base64url encoded SHA256 hash of the Base64Url encoded
attestation policy which was applied when generating this token.
:rtype: str or None
"""
return self._policy_hash
@property
def is_debuggable(self):
#type:() -> Union[bool, None]
""" Returns "True" if the source evidence being attested indicates
that the TEE has debugging enabled.
:rtype: bool or None
"""
return self._is_debuggable
@property
def product_id(self):
#type:() -> Union[float, None]
""" Returns the product id associated with the SGX enclave being attested.
:rtype: float or None
"""
return self._product_id
@property
def mr_enclave(self):
#type:() -> Union[str, None]
""" Returns HEX encoded `mr-enclave` value of the SGX enclave being attested.
:rtype: str or None
"""
return self._mr_enclave
@property
def mr_signer(self):
#type:() -> Union[str, None]
""" Returns HEX encoded `mr-signer` value of the SGX enclave being attested.
:rtype: str or None
"""
return self._mr_signer
@property
def svn(self):
#type:() -> Union[int, None]
""" Returns the `svn` value of the SGX enclave being attested.
:rtype: int or None
"""
return self._svn
@property
def enclave_held_data(self):
#type:() -> Union[bytes, None]
""" Returns the value of the runtime_data field specified as an input
to the :meth:`azure.security.attestation.AttestationClient.attest_sgx` or
:meth:`azure.security.attestation.AttestationClient.attest_open_enclave` API.
.. note:: The enclave_held_data prperty will only be populated if the
`runtime_data` parameter to the `Attest` API is marked as not being
JSON.
:rtype: bytes or None
"""
return self._enclave_held_data
@property
def sgx_collateral(self):
#type:() -> Union[Dict[str, Any], None]
""" Returns a set of information describing the complete set of inputs
to the `oe_verify_evidence`
:rtype: dict[str, Any] or None
"""
return self._sgx_collateral
# Deprecated fields.
[docs]class StoredAttestationPolicy(object):
""" Represents an attestation policy in storage.
When serialized, the `StoredAttestationPolicy` object will Base64Url encode the
UTF-8 representation of the `policy` value.
"""
def __init__(self, policy):
#type:(str) -> None
"""
:param str policy: Policy to be saved.
"""
self._policy = policy.encode("ascii")
[docs] def serialize(self, **kwargs):
#type:(Any) -> str
return GeneratedStoredAttestationPolicy(attestation_policy=self._policy).serialize(**kwargs)
@classmethod
def _from_generated(cls, generated):
#type:(GeneratedStoredAttestationPolicy) -> StoredAttestationPolicy
if generated is None:
return StoredAttestationPolicy("")
return StoredAttestationPolicy(generated.attestation_policy)
[docs]class AttestationData(object):
""" AttestationData represents an object passed as an input to the Attestation Service.
AttestationData comes in two forms: Binary and JSON. To distinguish between the two, when an :class:`AttestationData`
object is created, the caller provides an indication that the input binary data will be treated as either JSON or Binary.
If the `is_json` parameter is not provided, then the AttestationData
constructor will probe the `data` parameter to determine whether the data
should be treated as JSON.
The AttestationData is reflected in the generated :class:`AttestationResult` in two possible ways.
If the `AttestationData` is Binary, then the `AttestationData` is reflected in the `AttestationResult.enclave_held_data` claim.
If the `AttestationData` is JSON, then the `AttestationData` is expressed as JSON in the `AttestationResult.runtime_claims` or AttestationResult.inittime_claims claim.
:param bytes data: Input data to be sent to the attestation service.
:param bool is_json: True if the attestation service should treat the input data as JSON.
"""
def __init__(self, data, is_json=None):
# type:(bytes, bool) -> None
self._data = data
# If the caller thought that the input data is JSON, then respect their
# choice (this allows a caller to specify JSON data as if it was not JSON).
if is_json is not None:
self._is_json = is_json
else:
# The caller didn't say if the parameter is JSON or not, try parsing it,
# and if it parses, assume it's JSON.
try:
json.loads(data)
self._is_json = True
except Exception:
self._is_json = False
[docs]class TokenValidationOptions(object):
""" Validation options for an Attestation Token object.
:keyword bool validate_token: if True, validate the token, otherwise return the token unvalidated.
:keyword validation_callback: Callback to allow clients to perform custom validation of the token.
:paramtype validation_callback: Callable[[AttestationToken, AttestationSigner], bool]
:keyword bool validate_signature: if True, validate the signature of the token being validated.
:keyword bool validate_expiration: If True, validate the expiration time of the token being validated.
:keyword str issuer: Expected issuer, used if validate_issuer is true.
:keyword bool validate_issuer: If True, validate that the issuer of the token matches the expected issuer.
:keyword bool validate_not_before_time: If true, validate the "Not Before" time in the token.
"""
def __init__(self, **kwargs):
#type: (**Any) -> None
self.validate_token = kwargs.get('validate_token', True) # type: bool
self.validation_callback = kwargs.get('validation_callback') # type:Callable[[AttestationToken, AttestationSigner], bool]
self.validate_signature = kwargs.get('validate_signature', True) # type:bool
self.validate_expiration = kwargs.get('validate_expiration', True) # type:bool
self.validate_not_before = kwargs.get('validate_not_before', True) # type:bool
self.validate_issuer = kwargs.get('validate_issuer', False) # type:bool
self.issuer = kwargs.get('issuer') # type:str
# We assume a default validation slack of a half second to allow for a small
# amount of timer drift.
self.validation_slack = kwargs.get('validation_slack', 0.5) # type:float
[docs]class AttestationSigningKey(object):
""" Represents a signing key used by the attestation service.
Typically the signing key used by the service consists of two components: An RSA or ECDS private key and an X.509 Certificate wrapped around
the public key portion of the private key.
:param bytes signing_key_der: The RSA or ECDS signing key to sign the token supplied to the customer DER encoded.
:param bytes certificate_der: A DER encoded X.509 Certificate whose public key matches the signing_key's public key.
"""
def __init__(self, signing_key_der, certificate_der):
# type: (bytes, bytes) -> None
signing_key = serialization.load_der_private_key(signing_key_der, password=None, backend=default_backend())
certificate = load_der_x509_certificate(certificate_der, backend=default_backend())
self._signing_key = signing_key
self._certificate = certificate
# We only support ECDS and RSA keys in the MAA service.
if (not isinstance(signing_key, RSAPrivateKey) and not isinstance(signing_key, EllipticCurvePrivateKey)):
raise ValueError("Signing keys must be either ECDS or RSA keys.")
# Ensure that the public key in the certificate matches the public key of the key.
cert_public_key = certificate.public_key().public_bytes(
Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
key_public_key = signing_key.public_key().public_bytes(
Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
if cert_public_key != key_public_key:
raise ValueError("Signing key must match certificate public key")
[docs]class AttestationToken(Generic[T]):
""" Represents a token returned from the attestation service.
:keyword Any body: The body of the newly created token, if provided.
:keyword signer: If specified, the key used to sign the token.
If the `signer` property is not specified, the token created is unsecured.
:paramtype signer: azure.security.attestation.AttestationSigningKey
:keyword str token: If no body or signer is provided, the string representation of the token.
:keyword Type body_type: The underlying type of the body of the 'token' parameter, used to deserialize the underlying body when parsing the token.
"""
def __init__(self, **kwargs):
token = kwargs.get('token')
if token is None:
body = kwargs.get('body') # type: Any
signer = kwargs.get('signer') # type: AttestationSigningKey
if signer:
token = self._create_secured_jwt(body, signer)
else:
token = self._create_unsecured_jwt(body)
self._token = token
self._body_type = kwargs.get('body_type') #type: Type
token_parts = token.split('.')
if len(token_parts) != 3:
raise ValueError("Malformed JSON Web Token")
self.header_bytes = Base64Url.decode(token_parts[0])
self.body_bytes = Base64Url.decode(token_parts[1])
self.signature_bytes = Base64Url.decode(token_parts[2])
if len(self.body_bytes) != 0:
self._body = JSONDecoder().decode(self.body_bytes.decode('ascii'))
else:
self._body = None
self._header = JSONDecoder().decode(self.header_bytes.decode('ascii'))
def __str__(self):
return self._token
@property
def algorithm(self):
#type:() -> Union[str, None]
""" Json Web Token Header "alg".
See `RFC 7515 Section 4.1.1 <https://www.rfc-editor.org/rfc/rfc7515.html#section-4.1.1>`_ for details.
If the value of algorithm is "none" it indicates that the token is unsecured.
"""
return self._header.get('alg')
@property
def key_id(self):
#type:() -> Union[str, None]
""" Json Web Token Header "kid".
See `RFC 7515 Section 4.1.4 <https://www.rfc-editor.org/rfc/rfc7515.html#section-4.1.4>`_ for details.
"""
return self._header.get('kid')
@property
def expiration_time(self):
#type:() -> Union[datetime, None]
""" Expiration time for the token.
"""
exp = self._body.get('exp')
if exp:
return datetime.fromtimestamp(exp)
return None
@property
def not_before_time(self):
#type:() -> Union[datetime, None]
""" Time before which the token is invalid.
"""
nbf = self._body.get('nbf')
if nbf:
return datetime.fromtimestamp(nbf)
return None
@property
def issuance_time(self):
#type:() -> Union[datetime, None]
""" Time when the token was issued.
"""
iat = self._body.get('iat')
if iat:
return datetime.fromtimestamp(iat)
return None
@property
def content_type(self):
#type:() -> Union[str, None]
""" Json Web Token Header "content type".
See `RFC 7515 Section 4.1.10 <https://www.rfc-editor.org/rfc/rfc7515.html#section-4.1.10>`_ for details.
"""
return self._header.get('cty')
@property
def critical(self):
#type() -> # type: Optional[bool]
""" Json Web Token Header "Critical".
See `RFC 7515 Section 4.1.11 <https://www.rfc-editor.org/rfc/rfc7515.html#section-4.1.11>`_ for details.
"""
return self._header.get('crit')
@property
def key_url(self):
#type:() -> Union[str, None]
""" Json Web Token Header "Key URL".
See `RFC 7515 Section 4.1.2 <https://www.rfc-editor.org/rfc/rfc7515.html#section-4.1.2>`_ for details.
"""
return self._header.get('jku')
@property
def x509_url(self):
#type:() -> Union[str, None]
""" Json Web Token Header "X509 URL".
See `RFC 7515 Section 4.1.5 <https://www.rfc-editor.org/rfc/rfc7515.html#section-4.1.5>`_ for details.
"""
return self._header.get('x5u')
@property
def type(self):
#type:() -> Union[str, None]
""" Json Web Token Header "typ".
`RFC 7515 Section 4.1.9 <https://www.rfc-editor.org/rfc/rfc7515.html#section-4.1.9>`_ for details.
"""
return self._header.get('typ')
@property
def certificate_thumbprint(self):
#type:() -> Union[str, None]
""" The "thumbprint" of the certificate used to sign the request.
`RFC 7515 Section 4.1.7 <https://www.rfc-editor.org/rfc/rfc7515.html#section-4.1.7>`_ for details.
"""
return self._header.get('x5t')
@property
def certificate_sha256_thumbprint(self):
#type:() -> Union[str, None]
""" The "thumbprint" of the certificate used to sign the request generated using the SHA256 algorithm.
`RFC 7515 Section 4.1.8 <https://www.rfc-editor.org/rfc/rfc7515.html#section-4.1.8>`_ for details.
"""
return self._header.get('x5t#256')
@property
def issuer(self):
#type:() -> Union[str, None]
""" Json Web Token "iss" claim.
`RFC 7519 Section 4.1.1 <https://www.rfc-editor.org/rfc/rfc7519.html#section-4.1.1>`_ for details.
"""
return self._body.get('iss')
@property
def x509_certificate_chain(self):
#type:() -> Union[list[str], None]
""" An array of Base64 encoded X.509 certificates which represent a certificate chain used to sign the token.
See `RFC 7515 Section 4.1.6 <https://www.rfc-editor.org/rfc/rfc7515.html#section-4.1.6>`_ for details.
"""
x5c = self._header.get('x5c')
if x5c is not None:
return x5c
return None
def _json_web_key(self):
#type:() -> Union[JSONWebKey, None]
jwk = self._header.get('jwk')
return JSONWebKey.deserialize(jwk)
[docs] def serialize(self):
#type:() -> str
""" Returns a string serializing the JSON Web Token
:rtype: str
"""
return self._token
[docs] def validate_token(self, options=None, signers=None):
# type: (TokenValidationOptions, list[AttestationSigner]) -> bool
""" Validate the attestation token based on the options specified in the
:class:`TokenValidationOptions`.
:param azure.security.attestation.TokenValidationOptions options: Options to be used when validating
the token.
:param list[azure.security.attestation.AttestationSigner] signers: Potential signers for the token.
If the signers parameter is specified, validate_token will only
consider the signers as potential signatories for the token, otherwise
it will consider attributes in the header of the token.
:return bool: Returns True if the token successfully validated, False
otherwise.
:raises: azure.security.attestation.AttestationTokenValidationException
"""
if (options is None):
options = TokenValidationOptions(
validate_token=True, validate_signature=True, validate_expiration=True)
if not options.validate_token:
self._validate_static_properties(options)
if (options.validation_callback is not None):
options.validation_callback(self, None)
return True
signer = None
if self.algorithm != 'none' and options.validate_signature:
# validate the signature for the token.
candidate_certificates = self._get_candidate_signing_certificates(
signers)
signer = self._validate_signature(candidate_certificates)
if (signer is None):
raise AttestationTokenValidationException(
"Could not find the certificate used to sign the token.")
self._validate_static_properties(options)
if (options.validation_callback is not None):
if options.validation_callback(self, signer):
return True
raise AttestationTokenValidationException("User validation callback failed the validation request.")
return True
[docs] def get_body(self):
# type: () -> T
""" Returns the body of the attestation token as an object.
:rtype: T
"""
try:
return self._body_type.deserialize(self._body)
except AttributeError:
return self._body
def _get_candidate_signing_certificates(self, signing_certificates):
# type: (list[AttestationSigner]) -> list[AttestationSigner]
candidates = []
desired_key_id = self.key_id
if desired_key_id is not None:
for signer in signing_certificates:
if (signer.key_id == desired_key_id):
candidates.append(signer)
break
# If we didn't find a matching key ID in the supplied certificates,
# try the JWS header to see if there might be a corresponding key.
if (len(candidates) == 0):
jwk = self._json_web_key()
if jwk is not None:
if jwk.kid == desired_key_id:
if (jwk.x5_c):
signers = jwk.x5_c
candidates.append(AttestationSigner(
signers, desired_key_id))
else:
# We don't have a signer, so we need to try every possible signer.
# If the caller provided a list of certificates, use that as the exclusive source,
# otherwise iterate through the possible certificates.
if signing_certificates is not None:
for signer in signing_certificates:
candidates.append(signer)
else:
jwk = self._json_web_key()
if jwk.x5_c is not None:
signers = self._json_web_key().x5_c
candidates.append(AttestationSigner(signers, None))
candidates.append(self.x509_certificate_chain)
return candidates
def _get_certificates_from_x5c(self, x5clist):
# type:(list[str]) -> list[Certificate]
return [base64.b64decode(b64cert) for b64cert in x5clist]
def _validate_signature(self, candidate_certificates):
# type:(list[AttestationSigner]) -> AttestationSigner
signed_data = Base64Url.encode(
self.header_bytes)+'.'+Base64Url.encode(self.body_bytes)
for signer in candidate_certificates:
cert = load_der_x509_certificate(signer.certificates[0], backend=default_backend())
signer_key = cert.public_key()
# Try to verify the signature with this candidate.
# If it doesn't work, try the next signer.
try:
if isinstance(signer_key, RSAPublicKey):
signer_key.verify(
self.signature_bytes,
signed_data.encode('utf-8'),
padding.PKCS1v15(),
SHA256())
else:
signer_key.verify(
self.signature_bytes,
signed_data.encode('utf-8'),
SHA256())
return signer
except:
raise AttestationTokenValidationException("Could not verify signature of attestation token.")
return None
def _validate_static_properties(self, options):
# type:(TokenValidationOptions) -> bool
""" Validate the static properties in the attestation token.
"""
if self._body:
time_now = datetime.now()
if options.validate_expiration and self.expiration_time is not None:
if (time_now > self.expiration_time):
delta = time_now - self.expiration_time
if delta.total_seconds() > options.validation_slack:
raise AttestationTokenValidationException(u'Token is expired. Now: {}, Not Before: {}'.format(time_now.isoformat(), self.not_before_time.isoformat()))
if options.validate_not_before and hasattr(self, 'not_before_time') and self.not_before_time is not None:
if (time_now < self.not_before_time):
delta = self.not_before_time - time_now
if delta.total_seconds() > options.validation_slack:
raise AttestationTokenValidationException(u'Token is not yet valid. Now: {}, Not Before: {}'.format(time_now.isoformat(), self.not_before_time.isoformat()))
if options.validate_issuer and hasattr(self, 'issuer') and self.issuer is not None:
if (options.issuer != self.issuer):
raise AttestationTokenValidationException(u'Issuer in token: {} is not the expected issuer: {}.'.format(self.issuer, options.issuer))
return True
@staticmethod
def _create_unsecured_jwt(body):
# type: (Any) -> str
""" Return an unsecured JWT expressing the body.
"""
# Base64Url encoded '{"alg":"none"}'. See https://www.rfc-editor.org/rfc/rfc7515.html#appendix-A.5 for more information.
return_value = "eyJhbGciOiJub25lIn0."
# Try to serialize the body by asking the body object to serialize itself.
# This normalizes the attributes in the body object to conform to the serialized attributes used
# for transmission to the service.
try:
body = body.serialize()
except AttributeError:
pass
json_body = ''
if body is not None:
json_body = JSONEncoder().encode(body)
return_value += Base64Url.encode(json_body.encode('utf-8'))
return_value += '.'
return return_value
@staticmethod
def _create_secured_jwt(body, signer):
# type: (Any, AttestationSigningKey) -> str
""" Return a secured JWT expressing the body, secured with the specified signing key.
:param Any body: The body of the token to be serialized.
:param signer: the certificate and key to sign the token.
:type signer: AttestationSigningKey
"""
header = {
"alg": "RSA256" if isinstance(signer._signing_key, RSAPrivateKey) else "ECDH256",
"jwk": {
"x5c": [
base64.b64encode(signer._certificate.public_bytes(
Encoding.DER)).decode('utf-8')
]
}
}
json_header = JSONEncoder().encode(header)
return_value = Base64Url.encode(json_header.encode('utf-8'))
try:
body = body.serialize()
except AttributeError:
pass
json_body = ''
if body is not None:
json_body = JSONEncoder().encode(body)
return_value += '.'
return_value += Base64Url.encode(json_body.encode('utf-8'))
# Now we want to sign the return_value.
if isinstance(signer._signing_key, RSAPrivateKey):
signature = signer._signing_key.sign(
return_value.encode('utf-8'),
algorithm=SHA256(),
padding=padding.PKCS1v15())
else:
signature = signer._signing_key.sign(
return_value.encode('utf-8'),
algorithm=SHA256())
# And finally append the base64url encoded signature.
return_value += '.'
return_value += Base64Url.encode(signature)
return return_value
[docs]class AttestationTokenValidationException(Exception):
""" Thrown when an attestation token validation fails.
:param str message: Message for caller describing the reason for the failure.
"""
def __init__(self, message):
self.message = message
super(AttestationTokenValidationException, self).__init__(self.message)
[docs]class AttestationResponse(Generic[T]):
""" Represents a response from the attestation service.
:param token: Attestation Token returned from the service.
:type token: azure.security.attestation.AttestationToken
:param value: Value of the body of the attestation token.
:type value: T
"""
def __init__(self, token, value):
# type (AttestationToken, T) -> None
self.token = token #type: AttestationToken
self.value = value #type: T
[docs]class TpmAttestationRequest(object):
""" Represents a request for TPM attestation.
:param bytes data: The data sent to the Attestation Service in
the parameter to :meth:`azure.security.attestation.AttestationClient.attest_tpm`.
"""
def __init__(self, data):
#type (bytes) -> None
self.data = data
[docs]class TpmAttestationResponse(object):
""" Represents a request for TPM attestation.
:param bytes data: The data received from the Attestation Service in
response to a call to :meth:`azure.security.attestation.AttestationClient.attest_tpm`.
"""
def __init__(self, data):
#type (bytes) -> None
self.data = data