# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
# pylint:disable=too-many-lines,too-many-public-methods
import base64
from typing import Any, AsyncIterable, Optional, Iterable, List, Dict, Union
from functools import partial
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.polling import async_poller
from azure.keyvault.certificates.models import (
KeyVaultCertificate,
CertificateOperation,
CertificatePolicy,
DeletedCertificate,
CertificateProperties,
CertificateContact,
CertificateIssuer,
IssuerProperties,
)
from ._polling_async import CreateCertificatePollerAsync
from .._shared import AsyncKeyVaultClientBase
from .._shared.exceptions import error_map as _error_map
[docs]class CertificateClient(AsyncKeyVaultClientBase):
"""A high-level asynchronous interface for managing a vault's certificates.
:param str vault_url: URL of the vault the client will access
:param credential: An object which can provide an access token for the vault, such as a credential from
:mod:`azure.identity.aio`
Keyword arguments
- **api_version**: version of the Key Vault API to use. Defaults to the most recent.
- **transport**: :class:`~azure.core.pipeline.transport.AsyncHttpTransport` to use. Defaults to
:class:`~azure.core.pipeline.transport.AioHttpTransport`.
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START create_certificate_client]
:end-before: [END create_certificate_client]
:language: python
:dedent: 4
:caption: Creates a new instance of the Certificate client
"""
# pylint:disable=protected-access
[docs] @distributed_trace_async
async def create_certificate(
self, name: str, policy: CertificatePolicy, **kwargs: "**Any"
) -> Union[KeyVaultCertificate, CertificateOperation]:
"""Creates a new certificate.
If this is the first version, the certificate resource is created. This
operation requires the certificates/create permission.
:param str name: The name of the certificate.
:param policy: The management policy for the certificate.
:type policy:
~azure.keyvault.certificates.models.CertificatePolicy
:returns: A coroutine for the creation of the certificate. Awaiting the coroutine
returns the created KeyVaultCertificate if creation is successful, the CertificateOperation if not.
:rtype: ~azure.keyvault.certificates.models.KeyVaultCertificate or
~azure.keyvault.certificates.models.CertificateOperation
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Keyword arguments
- *enabled (bool)* - Determines whether the object is enabled.
- *tags (dict[str, str])* - Application specific metadata in the form of key-value pairs.
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START create_certificate]
:end-before: [END create_certificate]
:language: python
:caption: Create a certificate
:dedent: 8
"""
enabled = kwargs.pop("enabled", None)
tags = kwargs.pop("tags", None)
if enabled is not None:
attributes = self._client.models.CertificateAttributes(enabled=enabled)
else:
attributes = None
cert_bundle = await self._client.create_certificate(
vault_base_url=self.vault_url,
certificate_name=name,
certificate_policy=policy._to_certificate_policy_bundle(),
certificate_attributes=attributes,
tags=tags,
**kwargs
)
create_certificate_operation = CertificateOperation._from_certificate_operation_bundle(cert_bundle)
command = partial(self.get_certificate_operation, name=name, **kwargs)
get_certificate_command = partial(self.get_certificate, name=name, **kwargs)
create_certificate_polling = CreateCertificatePollerAsync(get_certificate_command=get_certificate_command)
return await async_poller(command, create_certificate_operation, None, create_certificate_polling)
[docs] @distributed_trace_async
async def get_certificate(self, name: str, **kwargs: "**Any") -> KeyVaultCertificate:
"""Gets a certificate with its management policy attached.
This operation requires the certificates/get permission. Does not accept the
version of the certificate as a parameter. If you wish to specify version, use
the get_certificate_version function and specify the desired version.
:param str name: The name of the certificate in the given vault.
:returns: An instance of KeyVaultCertificate
:rtype: ~azure.keyvault.certificates.models.KeyVaultCertificate
:raises:
:class:`~azure.core.exceptions.ResourceNotFoundError` if the certificate doesn't exist,
:class:`~azure.core.exceptions.HttpResponseError` for other errors
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START get_certificate]
:end-before: [END get_certificate]
:language: python
:caption: Get a certificate
:dedent: 8
"""
bundle = await self._client.get_certificate(
vault_base_url=self.vault_url,
certificate_name=name,
certificate_version="",
error_map=_error_map,
**kwargs
)
return KeyVaultCertificate._from_certificate_bundle(certificate_bundle=bundle)
[docs] @distributed_trace_async
async def get_certificate_version(self, name: str, version: str, **kwargs: "**Any") -> KeyVaultCertificate:
"""Gets a specific version of a certificate without returning its management policy.
If you wish to get the latest version of your certificate, or to get the certificate's policy as well,
use the get_certificate function.
:param str name: The name of the certificate in the given vault.
:param str version: The version of the certificate.
:returns: An instance of KeyVaultCertificate
:rtype: ~azure.keyvault.certificates.models.KeyVaultCertificate
:raises:
:class:`~azure.core.exceptions.ResourceNotFoundError` if the certificate doesn't exist,
:class:`~azure.core.exceptions.HttpResponseError` for other errors
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START get_certificate]
:end-before: [END get_certificate]
:language: python
:caption: Get a certificate
:dedent: 8
"""
bundle = await self._client.get_certificate(
vault_base_url=self.vault_url,
certificate_name=name,
certificate_version=version,
error_map=_error_map,
**kwargs
)
return KeyVaultCertificate._from_certificate_bundle(certificate_bundle=bundle)
[docs] @distributed_trace_async
async def delete_certificate(self, name: str, **kwargs: "**Any") -> DeletedCertificate:
"""Deletes a certificate from the key vault.
Deletes all versions of a certificate object along with its associated
policy. Delete certificate cannot be used to remove individual versions
of a certificate object. This operation requires the
certificates/delete permission.
:param str name: The name of the certificate.
:returns: The deleted certificate
:rtype: ~azure.keyvault.certificates.models.DeletedCertificate
:raises:
:class:`~azure.core.exceptions.ResourceNotFoundError` if the certificate doesn't exist,
:class:`~azure.core.exceptions.HttpResponseError` for other errors
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START delete_certificate]
:end-before: [END delete_certificate]
:language: python
:caption: Delete a certificate
:dedent: 8
"""
bundle = await self._client.delete_certificate(
vault_base_url=self.vault_url, certificate_name=name, error_map=_error_map, **kwargs
)
return DeletedCertificate._from_deleted_certificate_bundle(deleted_certificate_bundle=bundle)
[docs] @distributed_trace_async
async def get_deleted_certificate(self, name: str, **kwargs: "**Any") -> DeletedCertificate:
"""Retrieves information about the specified deleted certificate.
Retrieves the deleted certificate information plus its attributes,
such as retention interval, scheduled permanent deletion, and the
current deletion recovery level. This operation requires the certificates/
get permission.
:param str name: The name of the certificate.
:return: The deleted certificate
:rtype: ~azure.keyvault.certificates.models.DeletedCertificate
:raises:
:class:`~azure.core.exceptions.ResourceNotFoundError` if the certificate doesn't exist,
:class:`~azure.core.exceptions.HttpResponseError` for other errors
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START get_deleted_certificate]
:end-before: [END get_deleted_certificate]
:language: python
:caption: Get a deleted certificate
:dedent: 8
"""
bundle = await self._client.get_deleted_certificate(
vault_base_url=self.vault_url, certificate_name=name, error_map=_error_map, **kwargs
)
return DeletedCertificate._from_deleted_certificate_bundle(deleted_certificate_bundle=bundle)
[docs] @distributed_trace_async
async def purge_deleted_certificate(self, name: str, **kwargs: "**Any") -> None:
"""Permanently deletes the specified deleted certificate.
Performs an irreversible deletion of the specified certificate, without
possibility for recovery. The operation is not available if the recovery
level does not specified 'Purgeable'. This operation requires the
certificate/purge permission.
:param str name: The name of the certificate
:return: None
:rtype: None
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
await self._client.purge_deleted_certificate(
vault_base_url=self.vault_url, certificate_name=name, **kwargs
)
[docs] @distributed_trace_async
async def recover_deleted_certificate(self, name: str, **kwargs: "**Any") -> KeyVaultCertificate:
"""Recovers the deleted certificate back to its current version under
/certificates.
Performs the reversal of the Delete operation. THe operation is applicable
in vaults enabled for soft-delete, and must be issued during the retention
interval (available in the deleted certificate's attributes). This operation
requires the certificates/recover permission.
:param str name: The name of the deleted certificate
:return: The recovered certificate
:rtype: ~azure.keyvault.certificates.models.KeyVaultCertificate
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START recover_deleted_certificate]
:end-before: [END recover_deleted_certificate]
:language: python
:caption: Recover a deleted certificate
:dedent: 8
"""
bundle = await self._client.recover_deleted_certificate(
vault_base_url=self.vault_url, certificate_name=name, **kwargs
)
return KeyVaultCertificate._from_certificate_bundle(certificate_bundle=bundle)
[docs] @distributed_trace_async
async def import_certificate(
self, name: str, certificate_bytes: bytes, **kwargs: "**Any"
) -> KeyVaultCertificate:
"""Imports a certificate into a specified key vault.
Imports an existing valid certificate, containing a private key, into
Azure Key Vault. The certificate to be imported can be in either PFX or
PEM format. If the certificate is in PEM format the PEM file must
contain the key as well as x509 certificates. This operation requires
the certificates/import permission.
:param str name: The name of the certificate.
:param bytes certificate_bytes: Bytes of the certificate object to import.
This certificate needs to contain the private key.
:returns: The imported KeyVaultCertificate
:rtype: ~azure.keyvault.certificates.models.KeyVaultCertificate
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Keyword arguments
- *enabled (bool)* - Determines whether the object is enabled.
- *tags (dict[str, str])* - Application specific metadata in the form of key-value pairs.
- *password (str)* - If the private key in the passed in certificate is encrypted,
it is the password used for encryption.
- *policy (~azure.keyvault.certificates.models.CertificatePolicy)* - The management policy
for the certificate.
"""
enabled = kwargs.pop("enabled", None)
password = kwargs.pop("password", None)
policy = kwargs.pop("policy", None)
if enabled is not None:
attributes = self._client.models.CertificateAttributes(enabled=enabled)
else:
attributes = None
base64_encoded_certificate = base64.b64encode(certificate_bytes).decode("utf-8")
bundle = await self._client.import_certificate(
vault_base_url=self.vault_url,
certificate_name=name,
base64_encoded_certificate=base64_encoded_certificate,
password=password,
certificate_policy=CertificatePolicy._to_certificate_policy_bundle(policy),
certificate_attributes=attributes,
**kwargs
)
return KeyVaultCertificate._from_certificate_bundle(certificate_bundle=bundle)
[docs] @distributed_trace_async
async def get_policy(self, certificate_name: str, **kwargs: "**Any") -> CertificatePolicy:
"""Gets the policy for a certificate.
Returns the specified certificate policy resources in the key
vault. This operation requires the certificates/get permission.
:param str certificate_name: The name of the certificate in a given key vault.
:return: The certificate policy
:rtype: ~azure.keyvault.certificates.models.CertificatePolicy
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
bundle = await self._client.get_certificate_policy(
vault_base_url=self.vault_url, certificate_name=certificate_name, **kwargs
)
return CertificatePolicy._from_certificate_policy_bundle(certificate_policy_bundle=bundle)
[docs] @distributed_trace_async
async def update_policy(
self, certificate_name: str, policy: CertificatePolicy, **kwargs: "**Any"
) -> CertificatePolicy:
"""Updates the policy for a certificate.
Set specified members in the certificate policy. Leaves others as null.
This operation requries the certificates/update permission.
:param str certificate_name: The name of the certificate in the given vault.
:param policy: The policy for the certificate.
:type policy: ~azure.keyvault.certificates.models.CertificatePolicy
:return: The certificate policy
:rtype: ~azure.keyvault.certificates.models.CertificatePolicy
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
bundle = await self._client.update_certificate_policy(
vault_base_url=self.vault_url,
certificate_name=certificate_name,
certificate_policy=policy._to_certificate_policy_bundle(),
**kwargs
)
return CertificatePolicy._from_certificate_policy_bundle(certificate_policy_bundle=bundle)
[docs] @distributed_trace_async
async def update_certificate_properties(
self, name: str, version: Optional[str] = None, **kwargs: "**Any"
) -> KeyVaultCertificate:
"""Updates the specified attributes associated with the given certificate.
The UpdateCertificate operation applies the specified update on the
given certificate; the only elements updated are the certificate's
attributes. This operation requires the certificates/update permission.
:param str name: The name of the certificate in the given key vault.
:param str version: The version of the certificate.
:returns: The updated KeyVaultCertificate
:rtype: ~azure.keyvault.certificates.models.KeyVaultCertificate
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Keyword arguments
- *enabled (bool)* - Determines whether the object is enabled.
- *tags (dict[str, str])* - Application specific metadata in the form of key-value pairs.
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START update_certificate]
:end-before: [END update_certificate]
:language: python
:caption: Update a certificate's attributes
:dedent: 8
"""
enabled = kwargs.pop("enabled", None)
if enabled is not None:
attributes = self._client.models.CertificateAttributes(enabled=enabled)
else:
attributes = None
bundle = await self._client.update_certificate(
vault_base_url=self.vault_url,
certificate_name=name,
certificate_version=version or "",
certificate_attributes=attributes,
**kwargs
)
return KeyVaultCertificate._from_certificate_bundle(certificate_bundle=bundle)
[docs] @distributed_trace_async
async def backup_certificate(self, name: str, **kwargs: "**Any") -> bytes:
"""Backs up the specified certificate.
Requests that a backup of the specified certificate be downloaded
to the client. All versions of the certificate will be downloaded.
This operation requires the certificates/backup permission.
:param str name: The name of the certificate.
:return: the backup blob containing the backed up certificate.
:rtype: bytes
:raises:
:class:`~azure.core.exceptions.ResourceNotFoundError` if the certificate doesn't exist,
:class:`~azure.core.exceptions.HttpResponseError` for other errors
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START backup_certificate]
:end-before: [END backup_certificate]
:language: python
:caption: Get a certificate backup
:dedent: 8
"""
backup_result = await self._client.backup_certificate(
vault_base_url=self.vault_url, certificate_name=name, error_map=_error_map, **kwargs
)
return backup_result.value
[docs] @distributed_trace_async
async def restore_certificate_backup(self, backup: bytes, **kwargs: "**Any") -> KeyVaultCertificate:
"""Restores a backed up certificate to a vault.
Restores a backed up certificate, and all its versions, to a vault.
this operation requires the certificates/restore permission.
:param bytes backup: The backup blob associated with a certificate bundle.
:return: The restored KeyVaultCertificate
:rtype: ~azure.keyvault.certificates.models.KeyVaultCertificate
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START restore_certificate]
:end-before: [END restore_certificate]
:language: python
:caption: Restore a certificate backup
:dedent: 8
"""
bundle = await self._client.restore_certificate(
vault_base_url=self.vault_url, certificate_bundle_backup=backup, **kwargs
)
return KeyVaultCertificate._from_certificate_bundle(certificate_bundle=bundle)
[docs] @distributed_trace
def list_deleted_certificates(self, **kwargs: "**Any") -> AsyncIterable[DeletedCertificate]:
"""Lists the deleted certificates in the specified vault currently
available for recovery.
Retrieves the certificates in the current vault which are in a deleted
state and ready for recovery or purging. This operation includes
deletion-specific information. This operation requires the certificates/get/list
permission. This operation can only be enabled on soft-delete enabled vaults.
:return: An iterator like instance of DeletedCertificate
:rtype:
~azure.core.paging.ItemPaged[~azure.keyvault.certificates.models.DeletedCertificate]
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Keyword arguments
- *include_pending (bool)* - Specifies whether to include certificates which are
not completely deleted.
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START list_deleted_certificates]
:end-before: [END list_deleted_certificates]
:language: python
:caption: List all the deleted certificates
:dedent: 8
"""
max_page_size = kwargs.pop("max_page_size", None)
return self._client.get_deleted_certificates(
vault_base_url=self._vault_url,
maxresults=max_page_size,
cls=lambda objs: [DeletedCertificate._from_deleted_certificate_item(x) for x in objs],
**kwargs
)
[docs] @distributed_trace
def list_certificates(self, **kwargs: "**Any") -> AsyncIterable[CertificateProperties]:
"""List certificates in the key vault.
The GetCertificates operation returns the set of certificates resources
in the key vault. This operation requires the
certificates/list permission.
:returns: An iterator like instance of CertificateProperties
:rtype:
~azure.core.paging.ItemPaged[~azure.keyvault.certificates.models.CertificateProperties]
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Keyword arguments
- *include_pending (bool)* - Specifies whether to include certificates which are
not completely provisioned.
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START list_certificates]
:end-before: [END list_certificates]
:language: python
:caption: List all certificates
:dedent: 8
"""
max_page_size = kwargs.pop("max_page_size", None)
return self._client.get_certificates(
vault_base_url=self._vault_url,
maxresults=max_page_size,
cls=lambda objs: [CertificateProperties._from_certificate_item(x) for x in objs],
**kwargs
)
[docs] @distributed_trace
def list_certificate_versions(self, name: str, **kwargs: "**Any") -> AsyncIterable[CertificateProperties]:
"""List the versions of a certificate.
The GetCertificateVersions operation returns the versions of a
certificate in the key vault. This operation requires the
certificates/list permission.
:param str name: The name of the certificate.
:returns: An iterator like instance of CertificateProperties
:rtype:
~azure.core.paging.ItemPaged[~azure.keyvault.certificates.models.CertificateProperties]
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START list_certificate_versions]
:end-before: [END list_certificate_versions]
:language: python
:caption: List all versions of a certificate
:dedent: 8
"""
max_page_size = kwargs.pop("max_page_size", None)
return self._client.get_certificate_versions(
vault_base_url=self._vault_url,
certificate_name=name,
maxresults=max_page_size,
cls=lambda objs: [CertificateProperties._from_certificate_item(x) for x in objs],
**kwargs
)
[docs] @distributed_trace_async
async def get_certificate_operation(self, name: str, **kwargs: "**Any") -> CertificateOperation:
"""Gets the creation operation of a certificate.
Gets the creation operation associated with a specified certificate.
This operation requires the certificates/get permission.
:param str name: The name of the certificate.
:returns: The created CertificateOperation
:rtype: ~azure.keyvault.certificates.models.CertificateOperation
:raises:
:class:`~azure.core.exceptions.ResourceNotFoundError` if the certificate doesn't exist,
:class:`~azure.core.exceptions.HttpResponseError` for other errors
"""
bundle = await self._client.get_certificate_operation(
vault_base_url=self.vault_url, certificate_name=name, error_map=_error_map, **kwargs
)
return CertificateOperation._from_certificate_operation_bundle(certificate_operation_bundle=bundle)
[docs] @distributed_trace_async
async def delete_certificate_operation(self, name: str, **kwargs: "**Any") -> CertificateOperation:
"""Deletes the creation operation for a specific certificate.
Deletes the creation operation for a specified certificate that is in
the process of being created. The certificate is no longer created.
This operation requires the certificates/update permission.
:param str name: The name of the certificate.
:return: The deleted CertificateOperation
:rtype: ~azure.keyvault.certificates.models.CertificateOperation
:raises:
:class:`~azure.core.exceptions.ResourceNotFoundError` if the operation doesn't exist,
:class:`~azure.core.exceptions.HttpResponseError` for other errors
"""
bundle = await self._client.delete_certificate_operation(
vault_base_url=self.vault_url, certificate_name=name, error_map=_error_map, **kwargs
)
return CertificateOperation._from_certificate_operation_bundle(certificate_operation_bundle=bundle)
[docs] @distributed_trace_async
async def cancel_certificate_operation(self, name: str, **kwargs: "**Any") -> CertificateOperation:
"""Cancels a certificate operation.
Cancels a certificate creation operation that is already in progress.
This operation requires the certificates/update permission.
:param str name: The name of the certificate.
:returns: The cancelled certificate operation
:rtype: ~azure.keyvault.certificates.models.CertificateOperation
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
bundle = await self._client.update_certificate_operation(
vault_base_url=self.vault_url, certificate_name=name, cancellation_requested=True, **kwargs
)
return CertificateOperation._from_certificate_operation_bundle(certificate_operation_bundle=bundle)
[docs] @distributed_trace_async
async def merge_certificate(
self, name: str, x509_certificates: List[bytearray], **kwargs: "**Any"
) -> KeyVaultCertificate:
"""Merges a certificate or a certificate chain with a key pair existing on the server.
Performs the merging of a certificate or certificate chain with a key pair currently
available in the service. This operation requires the certificates/create permission.
Make sure when creating the certificate to merge using create_certificate that you set
its issuer to 'Unknown'. This way Key Vault knows that the certificate will not be signed
by an issuer known to it.
:param str name: The name of the certificate
:param x509_certificates: The certificate or the certificate chain to merge.
:type x509_certificates: list[bytearray]
:return: The merged certificate operation
:rtype: ~azure.keyvault.certificates.models.CertificateOperation
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Keyword arguments
- *enabled (bool)* - Determines whether the object is enabled.
- *tags (dict[str, str])* - Application specific metadata in the form of key-value pairs.
"""
enabled = kwargs.pop("enabled", None)
if enabled is not None:
attributes = self._client.models.CertificateAttributes(enabled=enabled)
else:
attributes = None
bundle = await self._client.merge_certificate(
vault_base_url=self.vault_url,
certificate_name=name,
x509_certificates=x509_certificates,
certificate_attributes=attributes,
**kwargs
)
return KeyVaultCertificate._from_certificate_bundle(certificate_bundle=bundle)
[docs] @distributed_trace_async
async def get_issuer(self, name: str, **kwargs: "**Any") -> CertificateIssuer:
"""Gets the specified certificate issuer.
Returns the specified certificate issuer resources in the key vault.
This operation requires the certificates/manageissuers/getissuers permission.
:param str name: The name of the issuer.
:return: The specified certificate issuer.
:rtype: ~azure.keyvault.certificates.models.CertificateIssuer
:raises:
:class:`~azure.core.exceptions.ResourceNotFoundError` if the issuer doesn't exist,
:class:`~azure.core.exceptions.HttpResponseError` for other errors
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START get_issuer]
:end-before: [END get_issuer]
:language: python
:caption: Get an issuer
:dedent: 8
"""
issuer_bundle = await self._client.get_certificate_issuer(
vault_base_url=self.vault_url, issuer_name=name, error_map=_error_map, **kwargs
)
return CertificateIssuer._from_issuer_bundle(issuer_bundle=issuer_bundle)
[docs] @distributed_trace_async
async def create_issuer(
self, name: str, provider: str, **kwargs: "**Any"
) -> CertificateIssuer:
"""Sets the specified certificate issuer.
The SetCertificateIssuer operation adds or updates the specified
certificate issuer. This operation requires the certificates/setissuers
permission.
:param str name: The name of the issuer.
:param str provider: The issuer provider.
:returns: The created CertificateIssuer
:rtype: ~azure.keyvault.certificates.models.CertificateIssuer
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Keyword arguments
- *enabled (bool)* - Determines whether the object is enabled.
- *account_id (str)* - The user name/account name/account id.
- *password (str)* - The password/secret/account key.
- *organization_id (str)* - Id of the organization.
- *admin_details (list[~azure.keyvault.certificates.models.AdministratorDetails])*
- Details of the organization administrators of the certificate issuer.
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START create_issuer]
:end-before: [END create_issuer]
:language: python
:caption: Create an issuer
:dedent: 8
"""
enabled = kwargs.pop("enabled", None)
account_id = kwargs.pop("account_id", None)
password = kwargs.pop("password", None)
organization_id = kwargs.pop("organization_id", None)
admin_details = kwargs.pop("admin_details", None)
if account_id or password:
issuer_credentials = self._client.models.IssuerCredentials(account_id=account_id, password=password)
else:
issuer_credentials = None
if admin_details and admin_details[0]:
admin_details_to_pass = list(
self._client.models.AdministratorDetails(
first_name=admin_detail.first_name,
last_name=admin_detail.last_name,
email_address=admin_detail.email,
phone=admin_detail.phone,
)
for admin_detail in admin_details
)
else:
admin_details_to_pass = admin_details
if organization_id or admin_details:
organization_details = self._client.models.OrganizationDetails(
id=organization_id, admin_details=admin_details_to_pass
)
else:
organization_details = None
if enabled is not None:
issuer_attributes = self._client.models.IssuerAttributes(enabled=enabled)
else:
issuer_attributes = None
issuer_bundle = await self._client.set_certificate_issuer(
vault_base_url=self.vault_url,
issuer_name=name,
provider=provider,
credentials=issuer_credentials,
organization_details=organization_details,
attributes=issuer_attributes,
**kwargs
)
return CertificateIssuer._from_issuer_bundle(issuer_bundle=issuer_bundle)
[docs] @distributed_trace_async
async def update_issuer(self, name: str, **kwargs: "**Any") -> CertificateIssuer:
"""Updates the specified certificate issuer.
Performs an update on the specified certificate issuer entity.
This operation requires the certificates/setissuers permission.
:param str name: The name of the issuer.
:param str provider: The issuer provider.
:return: The updated issuer
:rtype: ~azure.keyvault.certificates.models.CertificateIssuer
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Keyword arguments
- *enabled (bool)* - Determines whether the object is enabled.
- *provider (str)* - The issuer provider.
- *account_id (str)* - The user name/account name/account id.
- *password (str)* - The password/secret/account key.
- *organization_id (str)* - Id of the organization.
- *admin_details (list[~azure.keyvault.certificates.models.AdministratorDetails])*
- Details of the organization administrators of the certificate issuer.
"""
enabled = kwargs.pop("enabled", None)
provider = kwargs.pop("provider", None)
account_id = kwargs.pop("account_id", None)
password = kwargs.pop("password", None)
organization_id = kwargs.pop("organization_id", None)
admin_details = kwargs.pop("admin_details", None)
if account_id or password:
issuer_credentials = self._client.models.IssuerCredentials(account_id=account_id, password=password)
else:
issuer_credentials = None
if admin_details and admin_details[0]:
admin_details_to_pass = list(
self._client.models.AdministratorDetails(
first_name=admin_detail.first_name,
last_name=admin_detail.last_name,
email_address=admin_detail.email,
phone=admin_detail.phone,
)
for admin_detail in admin_details
)
else:
admin_details_to_pass = admin_details
if organization_id or admin_details:
organization_details = self._client.models.OrganizationDetails(
id=organization_id, admin_details=admin_details_to_pass
)
else:
organization_details = None
if enabled is not None:
issuer_attributes = self._client.models.IssuerAttributes(enabled=enabled)
else:
issuer_attributes = None
issuer_bundle = await self._client.update_certificate_issuer(
vault_base_url=self.vault_url,
issuer_name=name,
provider=provider,
credentials=issuer_credentials,
organization_details=organization_details,
attributes=issuer_attributes,
**kwargs
)
return CertificateIssuer._from_issuer_bundle(issuer_bundle=issuer_bundle)
[docs] @distributed_trace_async
async def delete_issuer(self, name: str, **kwargs: "**Any") -> CertificateIssuer:
"""Deletes the specified certificate issuer.
Permanently removes the specified certificate issuer from the vault.
This operation requires the certificates/manageissuers/deleteissuers permission.
:param str name: The name of the issuer.
:return: CertificateIssuer
:rtype: ~azure.keyvault.certificates.models.CertificateIssuer
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START delete_issuer]
:end-before: [END delete_issuer]
:language: python
:caption: Delete an issuer
:dedent: 8
"""
issuer_bundle = await self._client.delete_certificate_issuer(
vault_base_url=self.vault_url, issuer_name=name, **kwargs
)
return CertificateIssuer._from_issuer_bundle(issuer_bundle=issuer_bundle)
[docs] @distributed_trace
def list_issuers(self, **kwargs: "**Any") -> AsyncIterable[IssuerProperties]:
"""List certificate issuers for the key vault.
Returns the set of certificate issuer resources in the key
vault. This operation requires the certificates/manageissuers/getissuers
permission.
:return: An iterator like instance of Issuers
:rtype: ~azure.core.paging.ItemPaged[~azure.keyvault.certificates.models.CertificateIssuer]
:raises: :class:`~azure.core.exceptions.HttpResponseError`
Example:
.. literalinclude:: ../tests/test_examples_certificates_async.py
:start-after: [START list_issuers]
:end-before: [END list_issuers]
:language: python
:caption: List issuers of a vault
:dedent: 8
"""
max_page_size = kwargs.pop("max_page_size", None)
return self._client.get_certificate_issuers(
vault_base_url=self.vault_url,
maxresults=max_page_size,
cls=lambda objs: [IssuerProperties._from_issuer_item(x) for x in objs],
**kwargs
)