Source code for azure.confidentialledger.receipt._claims_digest_computation

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
"""Algorithm implementation for computing Azure Confidential Ledger application claims."""

from base64 import b64decode
from hashlib import sha256
import hmac
from typing import Any, Dict, List, cast

from azure.confidentialledger.receipt._claims_models import (
    ApplicationClaim,
    LedgerEntryClaim,
    ClaimDigest,
)

LEDGER_ENTRY_CLAIM_TYPE = "LedgerEntry"
DIGEST_CLAIM_TYPE = "ClaimDigest"
LEDGER_ENTRY_V1_CLAIM_PROTOCOL = "LedgerEntryV1"


[docs]def compute_claims_digest(application_claims: List[Dict[str, Any]]) -> str: """ Compute the claims digest from a list of Azure Confidential Ledger application claims. :param application_claims: List of application claims to be verified against the receipt. :type application_claims: List[Dict[str, Any]] :return: The claims digest of the application claims. :rtype: str :raises ValueError: If the claims digest computation has failed. """ # The logic is structured in three distinct steps: # 1. Claim dictionary validation # 2. Claim conversion to object # 3. Claims digest computation # For every step, we iterate over the list of application claims to process them one by one. # While it could be slightly inefficient to have three sets of iterations instead of a single one, # the main idea is to have a clean separation of concerns between the different steps # (e.g., not mix syntax/formatting errors with logical errors). # From a performance perspective, it is not expected to have a large number of application # claims in a single receipt (at the time of writing, we only support a single claim per receipt), # so the overhead of having three iterations should be negligible. # Validate application claims provided by the user _validate_application_claims(application_claims) # Convert application claims JSON objects to ApplicationClaim model application_claims_obj = [] for claim_dict in application_claims: claim = ApplicationClaim.from_dict(claim_dict) application_claims_obj.append(claim) # Compute claims digest from application claims return _compute_claims_hexdigest(application_claims_obj)
def _validate_application_claims(application_claims: List[Dict[str, Any]]): """Validate the application claims in a write transaction receipt. :param list[dict[str, any]] application_claims: List of application claims to be verified against the receipt. """ assert isinstance(application_claims, list) assert len(application_claims) > 0, "Application claims list cannot be empty" # Assert on each application claim object in the list for application_claim_object in application_claims: assert isinstance(application_claim_object, dict) # Assert on the kind of the claim assert "kind" in application_claim_object claim_kind = application_claim_object["kind"] assert isinstance(claim_kind, str) # Assert on the ledger entry claim if claim_kind == "LedgerEntry": ledger_entry_claim = application_claim_object.get("ledgerEntry") assert isinstance(ledger_entry_claim, dict) # Assert on the collection id assert "collectionId" in ledger_entry_claim assert isinstance(ledger_entry_claim["collectionId"], str) # Assert on the contents id assert "contents" in ledger_entry_claim assert isinstance(ledger_entry_claim["contents"], str) # Assert on the protocol assert "protocol" in ledger_entry_claim assert isinstance(ledger_entry_claim["protocol"], str) # Assert on the secret key assert "secretKey" in ledger_entry_claim assert isinstance(ledger_entry_claim["secretKey"], str) # Assert on the digest claim elif claim_kind == "ClaimDigest": assert "digest" in application_claim_object digest_claim = application_claim_object["digest"] assert isinstance(digest_claim, dict) # Assert on the digest value assert "value" in digest_claim assert isinstance(digest_claim["value"], str) # Assert on the protocol assert "protocol" in digest_claim assert isinstance(digest_claim["protocol"], str) else: assert False, f"Unknown claim kind: {claim_kind}" def _compute_ledger_entry_v1_claim_digest( ledger_entry_claim: LedgerEntryClaim, ) -> bytes: """Compute the digest of a LedgerEntryV1 claim. It returns the digest in bytes. :param LedgerEntryClaim ledger_entry_claim: LedgerEntry claim to be digested. :return: The digest of the LedgerEntry claim. :rtype: bytes """ # Decode the secret key secret_key = b64decode(ledger_entry_claim.secretKey, validate=True) # HMAC the collection ID with the secret key collection_id_digest = hmac.new( secret_key, ledger_entry_claim.collectionId.encode(), sha256, ).digest() # HMAC the ledger contents with the secret key contents_digest = hmac.new( secret_key, ledger_entry_claim.contents.encode(), sha256, ).digest() # Compute the SHA-256 of the concatenation of the collection ID and contents digests return sha256(collection_id_digest + contents_digest).digest() def _compute_ledger_entry_claim_digest(ledger_entry_claim: LedgerEntryClaim) -> bytes: """Compute the digest of a LedgerEntry claim. It returns the digest in bytes. :param LedgerEntryClaim ledger_entry_claim: LedgerEntry claim to be digested. :return: The digest of the LedgerEntry claim. :rtype: bytes """ claim_protocol = ledger_entry_claim.protocol # Compute the digest based on the specified protocol if claim_protocol == LEDGER_ENTRY_V1_CLAIM_PROTOCOL: # Compute the digest of the LedgerEntryV1 claim ledger_entry_digest = _compute_ledger_entry_v1_claim_digest(ledger_entry_claim) else: raise ValueError(f"Unsupported claim protocol: {claim_protocol}") # Compute the SHA-256 of the concatenation of the protocol and the ledger entry digest return sha256(claim_protocol.encode() + ledger_entry_digest).digest() def _compute_claim_digest_from_object(claim_digest_object: ClaimDigest) -> bytes: # Compute the SHA-256 of the concatenation of the protocol and the digest value return sha256( claim_digest_object.protocol.encode() + bytes.fromhex(claim_digest_object.value) ).digest() def _compute_claims_hexdigest(application_claims_list: List[ApplicationClaim]) -> str: """Compute the CCF claims digest from the provided list of application claims objects. It returns the hexdigest of the claims digest. :param list[ApplicationClaim] application_claims_list: List of application claims to be digested. :return: The hexdigest of the claims digest. :rtype: str """ # Initialize the claims digest claims_digests_concatenation = b"" # Iterate through all the application claims objects to compute their single digest. # We assume that the order of the application objects is valid # and the digests will be concatenated in the same order. for application_claim_object in application_claims_list: # Get the kind of the claim claim_kind = application_claim_object.kind if claim_kind == LEDGER_ENTRY_CLAIM_TYPE: # Compute the digest of the LedgerEntry claim claim_digest = _compute_ledger_entry_claim_digest( cast(LedgerEntryClaim, application_claim_object.ledgerEntry) ) elif claim_kind == DIGEST_CLAIM_TYPE: # Compute the digest of the ClaimDigest claim claim_digest = _compute_claim_digest_from_object( cast(ClaimDigest, application_claim_object.digest) ) else: raise ValueError(f"Unsupported claim kind: {claim_kind}") # Append the computed digest to the result claims_digests_concatenation += claim_digest # Prepend the size of application claims to the concatenation of the digests claims_digests_concatenation = ( len(application_claims_list).to_bytes(length=4, byteorder="little") + claims_digests_concatenation ) # Hash the concatenation of application claims and return the digest in hexadecimal form return sha256(claims_digests_concatenation).hexdigest()