Source code for azure.security.attestation._administration_client

# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------

from typing import List, Any, Optional, TYPE_CHECKING

from azure.core import PipelineClient
from six import python_2_unicode_compatible

if TYPE_CHECKING:
    # pylint: disable=unused-import,ungrouped-imports
    from typing import Any

    from azure.core.credentials import TokenCredential
    from azure.core.pipeline.transport import HttpRequest, HttpResponse

from ._generated import AzureAttestationRestClient
from ._generated.models import (
    AttestationType, 
    PolicyResult as GeneratedPolicyResult, 
    PolicyCertificatesResult, 
    JSONWebKey, 
    AttestationCertificateManagementBody, 
    StoredAttestationPolicy as GeneratedStoredAttestationPolicy,
    PolicyCertificatesModificationResult as GeneratedPolicyCertificatesModificationResult
)
from ._configuration import AttestationClientConfiguration
from ._models import (
    AttestationSigner, 
    AttestationToken, 
    AttestationResponse, 
    AttestationSigningKey, 
    PolicyCertificatesModificationResult,
    PolicyResult,
    AttestationTokenValidationException
)
import base64
from azure.core.tracing.decorator import distributed_trace
from threading import Lock, Thread


[docs]class AttestationAdministrationClient(object): """Provides administrative APIs for managing an instance of the Attestation Service. :param instance_url: base url of the service :type instance_url: str :param credential: Credentials for the caller used to interact with the service. :type credential: :class:`~azure.core.credentials.TokenCredential` :keyword Pipeline pipeline: If omitted, the standard pipeline is used. :keyword HttpTransport transport: If omitted, the standard pipeline is used. :keyword list[HTTPPolicy] policies: If omitted, the standard pipeline is used. """ def __init__( self, credential, # type: "TokenCredential" instance_url, # type: str **kwargs # type: Any ): # type: (...) -> None if not credential: raise ValueError("Missing credential.") self._config = AttestationClientConfiguration(credential, instance_url, **kwargs) self._client = AzureAttestationRestClient(credential, instance_url, **kwargs) self._statelock = Lock() self._signing_certificates = None
[docs] @distributed_trace def get_policy(self, attestation_type, **kwargs): #type(AttestationType, **Any) -> AttestationResponse[str]: """ Retrieves the attestation policy for a specified attestation type. :param attestation_type: :class:`azure.security.attestation.AttestationType` for which to retrieve the policy. :type attestation_type: azure.security.attestation.AttestationType :return: Attestation service response encapsulating a string attestation policy. :rtype: azure.security.attestation.AttestationResponse[str] :raises azure.security.attestation.AttestationTokenValidationException: Raised when an attestation token is invalid. """ policyResult = self._client.policy.get(attestation_type, **kwargs) token = AttestationToken[GeneratedPolicyResult](token=policyResult.token, body_type=GeneratedPolicyResult) token_body = token.get_body() stored_policy = AttestationToken[GeneratedStoredAttestationPolicy](token=token_body.policy, body_type=GeneratedStoredAttestationPolicy) actual_policy = stored_policy.get_body().attestation_policy #type: bytes if self._config.token_validation_options.validate_token: if not token.validate_token(self._config.token_validation_options, self._get_signers(**kwargs)): raise AttestationTokenValidationException("Token Validation of get_policy API failed.") return AttestationResponse[str](token, actual_policy.decode('utf-8'))
[docs] @distributed_trace def set_policy(self, attestation_type, attestation_policy, signing_key=None, **kwargs): #type:(AttestationType, str, Optional[AttestationSigningKey], **Any) -> AttestationResponse[PolicyResult] """ Sets the attestation policy for the specified attestation type. :param attestation_type: :class:`azure.security.attestation.AttestationType` for which to set the policy. :type attestation_type: azure.security.attestation.AttestationType :param attestation_policy: Attestation policy to be set. :type attestation_policy: str :param signing_key: Signing key to be used to sign the policy before sending it to the service. :type signing_key: azure.security.attestation.AttestationSigningKey :return: Attestation service response encapsulating a :class:`PolicyResult`. :rtype: azure.security.attestation.AttestationResponse[azure.security.attestation.PolicyResult] :raises azure.security.attestation.AttestationTokenValidationException: Raised when an attestation token is invalid. .. note:: If the attestation instance is in *Isolated* mode, then the `signing_key` parameter MUST be a signing key containing one of the certificates returned by :meth:`get_policy_management_certificates`. If the attestation instance is in *AAD* mode, then the `signing_key` parameter does not need to be provided. """ policy_token = AttestationToken[GeneratedStoredAttestationPolicy]( body=GeneratedStoredAttestationPolicy(attestation_policy = attestation_policy.encode('ascii')), signer=signing_key, body_type=GeneratedStoredAttestationPolicy) policyResult = self._client.policy.set(attestation_type=attestation_type, new_attestation_policy=policy_token.serialize(), **kwargs) token = AttestationToken[GeneratedPolicyResult](token=policyResult.token, body_type=GeneratedPolicyResult) if self._config.token_validation_options.validate_token: if not token.validate_token(self._config.token_validation_options, self._get_signers(**kwargs)): raise AttestationTokenValidationException("Token Validation of set_policy API failed.") return AttestationResponse[PolicyResult](token, PolicyResult._from_generated(token.get_body()))
[docs] @distributed_trace def reset_policy(self, attestation_type, signing_key=None, **kwargs): #type:(AttestationType, Optional[AttestationSigningKey], **dict[str, Any]) -> AttestationResponse[PolicyResult] """ Resets the attestation policy for the specified attestation type to the default value. :param attestation_type: :class:`azure.security.attestation.AttestationType` for which to set the policy. :type attestation_type: azure.security.attestation.AttestationType :param attestation_policy: Attestation policy to be reset. :type attestation_policy: str :param signing_key: Signing key to be used to sign the policy before sending it to the service. :type signing_key: azure.security.attestation.AttestationSigningKey :return: Attestation service response encapsulating a :class:`PolicyResult`. :rtype: azure.security.attestation.AttestationResponse[azure.security.attestation.PolicyResult] :raises azure.security.attestation.AttestationTokenValidationException: Raised when an attestation token is invalid. .. note:: If the attestation instance is in *Isolated* mode, then the `signing_key` parameter MUST be a signing key containing one of the certificates returned by :meth:`get_policy_management_certificates`. If the attestation instance is in *AAD* mode, then the `signing_key` parameter does not need to be provided. """ policy_token = AttestationToken( body=None, signer=signing_key) policyResult = self._client.policy.reset(attestation_type=attestation_type, policy_jws=policy_token.serialize(), **kwargs) token = AttestationToken[GeneratedPolicyResult](token=policyResult.token, body_type=GeneratedPolicyResult) if self._config.token_validation_options.validate_token: if not token.validate_token(self._config.token_validation_options, self._get_signers(**kwargs)): raise AttestationTokenValidationException("Token Validation of reset_policy API failed.") return AttestationResponse[PolicyResult](token, PolicyResult._from_generated(token.get_body()))
[docs] @distributed_trace def get_policy_management_certificates(self, **kwargs): #type:(**Any) -> AttestationResponse[list[list[bytes]]] """ Retrieves the set of policy management certificates for the instance. The list of policy management certificates will only be non-empty if the attestation service instance is in Isolated mode. :return: Attestation service response encapsulating a list of DER encoded X.509 certificate chains. :rtype: azure.security.attestation.AttestationResponse[list[list[bytes]]] """ cert_response = self._client.policy_certificates.get(**kwargs) token = AttestationToken[PolicyCertificatesResult]( token=cert_response.token, body_type=PolicyCertificatesResult) if self._config.token_validation_options.validate_token: if not token.validate_token(self._config.token_validation_options, self._get_signers(**kwargs)): raise Exception("Token Validation of PolicyCertificates API failed.") certificates = [] cert_list = token.get_body() for key in cert_list.policy_certificates.keys: key_certs = [base64.b64decode(cert) for cert in key.x5_c] certificates.append(key_certs) return AttestationResponse(token, certificates)
[docs] @distributed_trace def add_policy_management_certificate(self, certificate_to_add, signing_key, **kwargs): #type:(bytes, AttestationSigningKey, **Any) -> AttestationResponse[PolicyCertificatesModificationResult] """ Adds a new policy management certificate to the set of policy management certificates for the instance. :param bytes certificate_to_add: DER encoded X.509 certificate to add to the list of attestation policy management certificates. :param signing_key: Signing Key representing one of the *existing* attestation signing certificates. :type signing_key: azure.security.attestation.AttestationSigningKey :return: Attestation service response encapsulating the status of the add request. :rtype: azure.security.attestation.AttestationResponse[azure.security.attestation.PolicyCertificatesModificationResult] The :class:`PolicyCertificatesModificationResult` response to the :meth:`add_policy_management_certificate` API contains two attributes of interest. The first is `certificate_resolution`, which indicates whether the certificate in question is present in the set of policy management certificates after the operation has completed, or if it is absent. The second is the `thumbprint` of the certificate added. The `thumbprint` for the certificate is the SHA1 hash of the DER encoding of the certificate. """ key=JSONWebKey(kty='RSA', x5_c = [ base64.b64encode(certificate_to_add).decode('ascii')]) add_body = AttestationCertificateManagementBody(policy_certificate=key) cert_add_token = AttestationToken[AttestationCertificateManagementBody]( body=add_body, signer=signing_key, body_type=AttestationCertificateManagementBody) cert_response = self._client.policy_certificates.add(cert_add_token.serialize(), **kwargs) token = AttestationToken[GeneratedPolicyCertificatesModificationResult](token=cert_response.token, body_type=GeneratedPolicyCertificatesModificationResult) if self._config.token_validation_options.validate_token: if not token.validate_token(self._config.token_validation_options, self._get_signers(**kwargs)): raise Exception("Token Validation of PolicyCertificate Add API failed.") return AttestationResponse[PolicyCertificatesModificationResult](token, PolicyCertificatesModificationResult._from_generated(token.get_body()))
[docs] @distributed_trace def remove_policy_management_certificate(self, certificate_to_add, signing_key, **kwargs): #type:(bytes, AttestationSigningKey, **Any) -> AttestationResponse[PolicyCertificatesModificationResult] """ Removes a new policy management certificate to the set of policy management certificates for the instance. :param bytes certificate_to_add: DER encoded X.509 certificate to add to the list of attestation policy management certificates. :param signing_key: Signing Key representing one of the *existing* attestation signing certificates. :type signing_key: azure.security.attestation.AttestationSigningKey :return: Attestation service response encapsulating a list of DER encoded X.509 certificate chains. :rtype: azure.security.attestation.AttestationResponse[azure.security.attestation.PolicyCertificatesModificationResult] The :class:`PolicyCertificatesModificationResult` response to the :meth:`remove_policy_management_certificate` API contains two attributes of interest. The first is `certificate_resolution`, which indicates whether the certificate in question is present in the set of policy management certificates after the operation has completed, or if it is absent. The second is the `thumbprint` of the certificate added. The `thumbprint` for the certificate is the SHA1 hash of the DER encoding of the certificate. """ key=JSONWebKey(kty='RSA', x5_c = [ base64.b64encode(certificate_to_add).decode('ascii')]) add_body = AttestationCertificateManagementBody(policy_certificate=key) cert_add_token = AttestationToken[AttestationCertificateManagementBody]( body=add_body, signer=signing_key, body_type=AttestationCertificateManagementBody) cert_response = self._client.policy_certificates.remove(cert_add_token.serialize(), **kwargs) token = AttestationToken[GeneratedPolicyCertificatesModificationResult](token=cert_response.token, body_type=GeneratedPolicyCertificatesModificationResult) if self._config.token_validation_options.validate_token: if not token.validate_token(self._config.token_validation_options, self._get_signers(**kwargs)): raise Exception("Token Validation of PolicyCertificate Remove API failed.") return AttestationResponse[PolicyCertificatesModificationResult](token, PolicyCertificatesModificationResult._from_generated(token.get_body()))
def _get_signers(self, **kwargs): #type(**Any) -> List[AttestationSigner] """ Returns the set of signing certificates used to sign attestation tokens. """ with self._statelock: if (self._signing_certificates == None): signing_certificates = self._client.signing_certificates.get(**kwargs) self._signing_certificates = [] for key in signing_certificates.keys: # Convert the returned certificate chain into an array of X.509 Certificates. self._signing_certificates.append(AttestationSigner._from_generated(key)) signers = self._signing_certificates return signers
[docs] def close(self): # type: () -> None self._client.close()
def __enter__(self): # type: () -> AttestationAdministrationClient self._client.__enter__() return self def __exit__(self, *exc_details): # type: (Any) -> None self._client.__exit__(*exc_details)