Source code for azure.confidentialledger.aio._client

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------

import asyncio
from typing import Any, Optional, TYPE_CHECKING, Union

from azure.core.async_paging import AsyncItemPaged
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async

from ._client_base import AsyncConfidentialLedgerClientBase
from .._enums import LedgerUserRole, TransactionState
from .._generated._generated_ledger.v0_1_preview.models import ConfidentialLedgerQueryState
from .._models import (
    AppendResult,
    Constitution,
    Consortium,
    ConsortiumMember,
    EnclaveQuote,
    LedgerEnclaves,
    LedgerEntry,
    LedgerUser,
    TransactionReceipt,
    TransactionStatus,
)
from .._shared import (
    ConfidentialLedgerCertificateCredential,
)

if TYPE_CHECKING:
    from azure.core.credentials import TokenCredential


[docs]class ConfidentialLedgerClient(AsyncConfidentialLedgerClientBase): """An asynchronous client for putting data into and querying data from the Confidential Ledger service. The `transport` parameter is typically accepted by Azure SDK clients to provide a custom transport stage in the pipeline. Since this client makes modifications to the default transport, using a custom transport will override and remove the following functionality: 1) Authentication using a client certificate. 2) TLS verification using the Confidential Ledger TLS certificate. :param str endpoint: URL of the Confidential Ledger service. :param credential: A credential object for authenticating with the Confidential Ledger. :type credential: ~azure.confidentialledger.ConfidentialLedgerCertificateCredential :param str ledger_certificate_path: The path to the ledger's TLS certificate. :keyword api_version: Version of the Confidential Ledger API to use. Defaults to the most recent. Support API versions: - 0.1-preview :type api_version: str """ def __init__( self, endpoint: str, credential: Union[ConfidentialLedgerCertificateCredential, "TokenCredential"], ledger_certificate_path: str, **kwargs: Any, ) -> None: super().__init__( endpoint=endpoint, credential=credential, ledger_certificate_path=ledger_certificate_path, **kwargs )
[docs] @distributed_trace_async async def append_to_ledger( self, entry_contents: str, *, sub_ledger_id: Optional[str] = None, wait_for_commit: bool = False, **kwargs: Any, ) -> AppendResult: """Appends an entry to the Confidential Ledger. :param entry_contents: Text to write to the ledger. :type entry_contents: str :param sub_ledger_id: Identifies the sub-ledger to append to, defaults to None. If none is specified, the service will use the service-default sub-ledger id. :type sub_ledger_id: Optional[str] :param wait_for_commit: If True, this method will not return until the write is durably saved to the ledger, defaults to False. :type wait_for_commit: bool, defaults to False. :return: Details about the write transaction. :rtype: ~azure.confidentialledger.AppendResult :raises: ~azure.core.exceptions.HttpResponseError """ if entry_contents is None: raise ValueError("entry_contents must not be None") # pylint: disable=protected-access result = await self._client.confidential_ledger.post_ledger_entry( contents=entry_contents, sub_ledger_id=sub_ledger_id, cls=kwargs.pop("cls", AppendResult._from_pipeline_result), **kwargs ) if wait_for_commit: await self.wait_until_durable(result.transaction_id, **kwargs) return result
[docs] @distributed_trace_async async def create_or_update_user( self, user_id: str, role: Union[str, LedgerUserRole], **kwargs: Any ) -> LedgerUser: """Creates a new Confidential Ledger user, or updates an existing one. :param user_id: Identifies the user to delete. This should be an AAD object id or certificate fingerprint. :type user_id: str :param role: Role to assigned to the user. :type role: str or LedgerUserRole :return: Details of the updated ledger user. :rtype: ~azure.confidentialledger.LedgerUser :raises: ~azure.core.exceptions.HttpResponseError """ if user_id is None or role is None: raise ValueError("user_id or role cannot be None") result = await self._client.confidential_ledger.create_or_update_user( user_id=user_id, assigned_role=role.value if isinstance(role, LedgerUserRole) else role, **kwargs ) return LedgerUser( user_id=result.user_id, role=LedgerUserRole(result.assigned_role) )
[docs] @distributed_trace_async async def delete_user(self, user_id: str, **kwargs: Any) -> None: """Deletes a user from the Confidential Ledger. :param user_id: Identifies the user to delete. This should be an AAD object id or certificate fingerprint. :type user_id: str :return: None :rtype: None :raises: ~azure.core.exceptions.HttpResponseError """ if user_id is None: raise ValueError("user_id cannot be None") await self._client.confidential_ledger.delete_user(user_id=user_id, **kwargs)
[docs] @distributed_trace_async async def get_constitution(self, **kwargs: Any) -> Constitution: """Gets the constitution used for governance. The constitution is a script that assesses and applies proposals from consortium members. :return: The contents of the constitution and its digest. :rtype: ~azure.confidentialledger.Constitution :raises: ~azure.core.exceptions.HttpResponseError """ result = await self._client.confidential_ledger.get_constitution(**kwargs) return Constitution(script=result.script, digest=result.digest)
[docs] @distributed_trace_async async def get_consortium(self, **kwargs: Any) -> Consortium: """Gets the consortium members. Consortium members can manage the Confidential Ledger. :return: Details about the consortium. :rtype: ~azure.confidentialledger.Consortium :raises: ~azure.core.exceptions.HttpResponseError """ result = await self._client.confidential_ledger.get_consortium_members(**kwargs) return Consortium( members=[ ConsortiumMember(certificate=member.certificate, member_id=member.id) for member in result.members ] )
[docs] @distributed_trace_async async def get_enclave_quotes(self, **kwargs: Any) -> LedgerEnclaves: """Gets enclave quotes from all nodes in the Confidential Ledger network. :return: Enclave quotes for nodes in the Confidential Ledger. :rtype: ~azure.confidentialledger.LedgerEnclaves :raises: ~azure.core.exceptions.HttpResponseError """ result = await self._client.confidential_ledger.get_enclave_quotes(**kwargs) return LedgerEnclaves( { quote.node_id: EnclaveQuote( node_id=quote.node_id, mrenclave=quote.mrenclave, raw_quote=quote.raw, version=quote.quote_version, ) for quote in result.enclave_quotes.values() }, result.current_node_id, )
[docs] @distributed_trace def get_ledger_entries( self, *, from_transaction_id=None, # type: Optional[str] to_transaction_id=None, # type: Optional[str] sub_ledger_id=None, # type: Optional[str] **kwargs # type: Any ) -> AsyncItemPaged[LedgerEntry]: """Gets a range of entries in the ledger. :param from_transaction_id: Transaction identifier from which to start the query, defaults to None. If this is None, the query begins from the first transaction. :type from_transaction_id: Optional[str] :param to_transaction_id: Transaction identifier at which to end the query (inclusive), defaults to None. If this is None, the query ends at the end of the ledger. :type from_transaction_id: Optional[str] :param sub_ledger_id: Identifies the sub-ledger to fetch the ledger entry from, defaults to None. :type sub_ledger_id: Optional[str] :return: An iterable for iterating over the entries in the range. :rtype: ~azure.core.async_paging.AsyncItemPaged[LedgerEntry] :raises: ~azure.core.exceptions.HttpResponseError """ if from_transaction_id is not None: if not from_transaction_id: raise ValueError( "If not None, from_transaction_id must be a non-empty string" ) if to_transaction_id is not None: if not to_transaction_id: raise ValueError( "If not None, to_transaction_id must be a non-empty string" ) # pylint: disable=protected-access return self._client.confidential_ledger.get_ledger_entries( from_transaction_id=from_transaction_id, to_transaction_id=to_transaction_id, sub_ledger_id=sub_ledger_id, cls=kwargs.pop( "cls", lambda entries: [ LedgerEntry._from_pipeline_result(entry) for entry in entries ] if entries is not None else [], ), **kwargs )
[docs] @distributed_trace_async async def get_ledger_entry( self, *, transaction_id: Optional[str] = None, sub_ledger_id: Optional[str] = None, interval: float = 0.5, max_tries: int = 6, **kwargs: Any, ) -> LedgerEntry: """Gets an entry in the ledger. :param transaction_id: Transaction identifier, defaults to None. If this is None, the latest transaction is fetched. :type transaction_id: Optional[str] :param sub_ledger_id: Identifies the sub-ledger to fetch the ledger entry from, defaults to None. :type sub_ledger_id: Optional[str] :param interval: Interval, in seconds, between retries while waiting for results. :type interval: float :param max_tries: Maximum number of times to try the query. Retries are attempted if the result is not Ready. :type max_tries: int :return: The corresponding ledger entry. :rtype: ~azure.confidentialledger.LedgerEntry :raises: ~azure.core.exceptions.HttpResponseError """ if transaction_id is not None: if not transaction_id: raise ValueError( "If not None, transaction_id must be a non-empty string" ) if transaction_id is None: result = await self._client.confidential_ledger.get_current_ledger_entry( sub_ledger_id=sub_ledger_id, **kwargs ) return LedgerEntry( transaction_id=result.transaction_id, contents=result.contents, sub_ledger_id=result.sub_ledger_id, ) ready = False result = None state = None for _ in range(max_tries): result = await self._client.confidential_ledger.get_ledger_entry( transaction_id=transaction_id, sub_ledger_id=sub_ledger_id, **kwargs ) ready = result.state == ConfidentialLedgerQueryState.READY if not ready: state = result.state await asyncio.sleep(interval) else: break if not ready: raise TimeoutError( "After {} attempts, the query still had state {}, not {}".format( max_tries, state, ConfidentialLedgerQueryState.READY ) ) return LedgerEntry( transaction_id=result.entry.transaction_id, contents=result.entry.contents, sub_ledger_id=result.entry.sub_ledger_id, )
[docs] @distributed_trace_async async def get_transaction_receipt( self, transaction_id: str, *, interval: float = 0.5, max_tries: int = 6, **kwargs: Any, ) -> TransactionReceipt: """Get a receipt for a specific transaction. :param transaction_id: Transaction identifier. :type transaction_id: str :param interval: Interval, in seconds, between retries while waiting for results. :type interval: float :param max_tries: Maximum number of times to try the query. Retries are attempted if the result is not Ready. :type max_tries: int :return: Receipt certifying the specified transaction. :rtype: ~azure.confidentialledger.TransactionReceipt :raises: ~azure.core.exceptions.HttpResponseError """ if transaction_id is None: raise ValueError("transaction_id cannot be None") ready = False result = None state = None for _ in range(max_tries): result = await self._client.confidential_ledger.get_receipt( transaction_id=transaction_id, **kwargs ) ready = result.state == ConfidentialLedgerQueryState.READY if not ready: state = result.state await asyncio.sleep(interval) else: break if not ready: raise TimeoutError( "After {} attempts, the query still had state {}, not {}".format( max_tries, state, ConfidentialLedgerQueryState.READY ) ) return TransactionReceipt( transaction_id=result.transaction_id, receipt=result.receipt )
[docs] @distributed_trace_async async def get_transaction_status( self, transaction_id: str, **kwargs: Any ) -> TransactionStatus: """Gets the status of a transaction. :param transaction_id: Identifier for the transaction to get the status of. :type transaction_id: str :return: Status object describing the transaction status. :rtype: ~azure.confidentialledger.TransactionStatus :raises: ~azure.core.exceptions.HttpResponseError """ if transaction_id is None: raise ValueError("transaction_id cannot be None") result = await self._client.confidential_ledger.get_transaction_status( transaction_id=transaction_id, **kwargs ) return TransactionStatus( transaction_id=result.transaction_id, state=TransactionState(result.state) )
[docs] @distributed_trace_async async def get_user(self, user_id: str, **kwargs: Any) -> LedgerUser: """Gets a Confidential Ledger user. :param user_id: Identifies the user to delete. This should be an AAD object id or certificate fingerprint. :type user_id: str :return: Details about the user. :rtype: ~azure.confidentialledger.LedgerUser :raises: ~azure.core.exceptions.HttpResponseError """ if user_id is None: raise ValueError("user_id cannot be None") result = await self._client.confidential_ledger.get_user(user_id=user_id, **kwargs) return LedgerUser( user_id=result.user_id, role=LedgerUserRole(result.assigned_role) )
[docs] @distributed_trace_async async def wait_until_durable( self, transaction_id, # type: str *, interval=0.5, # type: float max_queries=3, # type: int **kwargs # type: Any ): # type: (...) -> None """Queries the status of the specified transaction until it is Committed, indicating that the transaction is durably stored in the Confidential Ledger. If this state is not reached by `max_queries`, a TimeoutError is raised. :param transaction_id: Identifies the transaction to wait for. :type transaction_id: str :param interval: Time, in seconds, to wait between queries. :type interval: float :param max_queries: The maximum amount of queries to make before raising an exception. :type max_queries: int :return: None. :rtype: None :raises: ~azure.core.exceptions.HttpResponseError """ for attempt_num in range(max_queries): transaction_status = await self.get_transaction_status( transaction_id=transaction_id, **kwargs ) if transaction_status.state is TransactionState.COMMITTED: return if attempt_num < max_queries - 1: await asyncio.sleep(interval) raise TimeoutError( "Transaction {} is not {} yet".format( transaction_id, TransactionState.COMMITTED ) )