Source code for azure.confidentialledger.receipt._receipt_verification

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
"""Algorithm implementation for verifying Azure Confidential Ledger write
transaction receipts."""

from base64 import b64decode
from hashlib import sha256
from typing import Dict, List, Any, cast, Optional

from cryptography.x509 import load_pem_x509_certificate, Certificate
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat

from azure.confidentialledger.receipt._receipt_models import (
    LeafComponents,
    ProofElement,
    Receipt,
)

from azure.confidentialledger.receipt._utils import (
    _convert_dict_to_camel_case,
)

from azure.confidentialledger.receipt._claims_digest_computation import (
    compute_claims_digest,
)


[docs]def verify_receipt( receipt: Dict[str, Any], service_cert: str, *, application_claims: Optional[List[Dict[str, Any]]] = None, ) -> None: """Verify that a given Azure Confidential Ledger write transaction receipt is valid from its content and the Confidential Ledger service identity certificate. :param receipt: Receipt dictionary containing the content of an Azure Confidential Ledger write transaction receipt. :type receipt: Dict[str, Any] :param service_cert: String containing the PEM-encoded certificate of the Confidential Ledger service identity. :type service_cert: str :keyword application_claims: List of application claims to be verified against the receipt. :paramtype application_claims: Optional[List[Dict[str, Any]]] :raises ValueError: If the receipt verification has failed. """ # Validate receipt content and convert it into a Receipt model receipt_obj = _preprocess_input_receipt(receipt) # Validate application claims provided by the user, if any if application_claims: computed_claims_digest = compute_claims_digest(application_claims) if computed_claims_digest != receipt_obj.leafComponents.claimsDigest: raise ValueError( "The computed claims digest from application claims does not match the receipt claims digest." ) # Load node PEM certificate node_cert = _load_and_verify_pem_certificate(receipt_obj.cert) # Verify node certificate is endorsed by the service certificate # through endorsements certificates _verify_node_cert_endorsed_by_service_cert( node_cert, service_cert, cast(List[str], receipt_obj.serviceEndorsements) ) # Compute hash of the leaf node in the Merkle Tree corresponding # to the transaction associated to the given receipt leaf_node_hash = _compute_leaf_node_hash(receipt_obj.leafComponents) # Compute root of the Merkle Tree at the time the transaction was committed root_node_hash = _compute_root_node_hash(leaf_node_hash, receipt_obj.proof) # Verify signature of the signing node over the root of the tree with # node certificate public key _verify_signature_over_root_node_hash( receipt_obj.signature, node_cert, cast(str, receipt_obj.nodeId), root_node_hash )
def _preprocess_input_receipt(receipt_dict: Dict[str, Any]) -> Receipt: """Preprocess input receipt dictionary, validate its content, and returns a valid Receipt object based on the vetted input data. :param dict[str, any] receipt_dict: Receipt dictionary :return: Receipt object :rtype: Receipt """ # Convert any key in the receipt dictionary to camel case # to match the model fields (we do this because customers may # provide receipts with snake case keys since they were returned # by older ACL instances) receipt_dict = _convert_dict_to_camel_case(receipt_dict) _validate_receipt_content(receipt_dict) # Convert receipt JSON object to Receipt model return Receipt.from_dict(receipt_dict) def _validate_receipt_content(receipt: Dict[str, Any]): """Validate the content of a write transaction receipt. :param dict[str, any] receipt: Receipt dictionary """ try: assert "cert" in receipt assert isinstance(receipt["cert"], str) assert "leafComponents" in receipt assert isinstance(receipt["leafComponents"], dict) assert "claimsDigest" in receipt["leafComponents"] assert isinstance(receipt["leafComponents"]["claimsDigest"], str) assert "commitEvidence" in receipt["leafComponents"] assert isinstance(receipt["leafComponents"]["commitEvidence"], str) assert "writeSetDigest" in receipt["leafComponents"] assert isinstance(receipt["leafComponents"]["writeSetDigest"], str) assert "proof" in receipt assert isinstance(receipt["proof"], list) # Validate elements in proof for elem in receipt["proof"]: assert "left" in elem or "right" in elem if "left" in elem: assert isinstance(elem["left"], str) if "right" in elem: assert isinstance(elem["right"], str) assert "signature" in receipt assert isinstance(receipt["signature"], str) # Validate nodeId, if present if "nodeId" in receipt: assert isinstance(receipt["nodeId"], str) # Validate serviceEndorsements, if present if "serviceEndorsements" in receipt: assert isinstance(receipt["serviceEndorsements"], list) # Validate elements in serviceEndorsements for elem in receipt["serviceEndorsements"]: assert isinstance(elem, str) except Exception as exception: raise ValueError("The receipt content is invalid.") from exception def _verify_signature_over_root_node_hash( signature: str, node_cert: Certificate, node_id: str, root_node_hash: bytes ) -> None: """Verify signature over root node hash of the Merkle Tree using node certificate public key. :param str signature: Signature :param Certificate node_cert: Node certificate :param str node_id: Node ID :param bytes root_node_hash: Root node hash """ try: # Verify public key contained in the node certificate is equal to the node_id public_key_bytes = node_cert.public_key().public_bytes( Encoding.DER, PublicFormat.SubjectPublicKeyInfo ) if node_id is not None: assert sha256(public_key_bytes).digest() == bytes.fromhex(node_id) # Verify signature over root node hash using node certificate public key _verify_ec_signature( node_cert, b64decode(signature, validate=True), root_node_hash, hashes.SHA256(), ) except Exception as exception: raise ValueError( f"Encountered exception when verifying signature {signature} over root node hash." ) from exception def _compute_leaf_node_hash(leaf_components: LeafComponents) -> bytes: """Compute the hash of the leaf node associated to a transaction given the leaf components from a write transaction receipt. :param LeafComponents leaf_components: Leaf components :return: Leaf node hash :rtype: bytes """ try: # Digest commit evidence string commit_evidence_digest = sha256( leaf_components.commitEvidence.encode() ).digest() # Convert write set digest to bytes write_set_digest = bytes.fromhex(leaf_components.writeSetDigest) # Convert claims digest to bytes claims_digest = bytes.fromhex(leaf_components.claimsDigest) # Create leaf node hash by hashing the concatenation of its three components # as bytes objects in the following order: # 1. write_set_digest # 2. commit_evidence_digest # 3. claims_digest return sha256( write_set_digest + commit_evidence_digest + claims_digest ).digest() except Exception as exception: raise ValueError( f"Encountered exception when computing leaf node hash from leaf components {leaf_components}." ) from exception def _compute_root_node_hash(leaf_hash: bytes, proof: List[ProofElement]) -> bytes: """Re-compute the hash of the root of the Merkle tree from a leaf node hash and a receipt proof list containing the required nodes hashes for the computation. :param bytes leaf_hash: Leaf node hash :param list[ProofElement] proof: Receipt proof list :return: Root node hash :rtype: bytes """ try: # Initialize current hash to leaf hash current_node_hash = leaf_hash # Iterate through all the elements in proof list for element in proof: # Check that the current element only contains either one left or right node hash if ( element is None or (element.left is None and element.right is None) or (element.left is not None and element.right is not None) ): raise ValueError( "Invalid proof element in receipt: element must contain either one left or right node hash." ) parent_node_hash = bytes() # If the current element contains a left hash, concatenate the left hash and the current node hash if element.left is not None: parent_node_hash = bytes.fromhex(element.left) + current_node_hash # If the current element contains a right hash, concatenate the current node hash and the right hash if element.right is not None: parent_node_hash = current_node_hash + bytes.fromhex(element.right) # Hash the parent node hash current_node_hash = sha256(parent_node_hash).digest() return current_node_hash except Exception as exception: raise ValueError( f"Encountered exception when computing root node hash from proof list {proof}." ) from exception def _verify_certificate_endorsement( endorsee: Certificate, endorser: Certificate ) -> None: """Verify that the endorser certificate has endorsed endorsee certificate using ECDSA. :param Certificate endorsee: Endorsee certificate :param Certificate endorser: Endorser certificate """ try: # Extract TBS certificate hash from endorsee certificate hash_algorithm = cast(hashes.HashAlgorithm, endorsee.signature_hash_algorithm) digester = hashes.Hash(hash_algorithm) digester.update(endorsee.tbs_certificate_bytes) cert_digest = digester.finalize() # Verify endorser signature over endorsee certificate digest _verify_ec_signature(endorser, endorsee.signature, cert_digest, hash_algorithm) except Exception as exception: raise ValueError( f"Encountered exception when verifying endorsement of certificate {endorsee} by certificate {endorser}." ) from exception def _verify_ec_signature( certificate: Certificate, signature: bytes, data: bytes, hash_algorithm: hashes.HashAlgorithm, ) -> None: """Verify a signature over data using the certificate public key. :param Certificate certificate: Certificate :param bytes signature: Signature :param bytes data: Data :param hashes.HashAlgorithm hash_algorithm: Hash algorithm """ public_key = cast(ec.EllipticCurvePublicKey, certificate.public_key()) public_key.verify( signature, data, ec.ECDSA(utils.Prehashed(hash_algorithm)), ) def _verify_node_cert_endorsed_by_service_cert( node_cert: Certificate, service_cert_str: str, endorsements_certs: List[str] ) -> None: """Check a node certificate is endorsed by a service certificate. If a list of endorsements certificates is not empty, check that the node certificate is transitively endorsed by the service certificate through the endorsement certificates in the list. :param Certificate node_cert: Node certificate :param str service_cert_str: Service certificate string :param list[str] endorsements_certs: Endorsements certificates list """ current_cert = node_cert # Validate endorsement certificates list is present if endorsements_certs is None: endorsements_certs = [] # Add service certificate to the list of endorsements certificates endorsements_certs.append(service_cert_str) # Iterate through all the endorsements certificates for endorsement in endorsements_certs: # Load endorsement PEM certificate endorsement_cert = _load_and_verify_pem_certificate(endorsement) # Verify endorsement certificate has endorsed current certificate _verify_certificate_endorsement(current_cert, endorsement_cert) # Set current certificate to endorsement certificate to continue the chain verification current_cert = endorsement_cert def _load_and_verify_pem_certificate(cert_str: str) -> Certificate: """Load PEM certificate from a string representation and verify it is a valid certificate with expected Elliptic Curve public key. :param str cert_str: PEM certificate string :return: Certificate :rtype: Certificate """ try: # Load certificate from string cert = load_pem_x509_certificate(cert_str.encode()) # Verify public key is of the correct type assert isinstance(cert.public_key(), ec.EllipticCurvePublicKey) return cert except Exception as exception: raise ValueError(f"PEM certificate {cert_str} is not valid.") from exception