Source code for azure.keyvault.administration._backup_client

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import base64
import functools
import pickle
from typing import TYPE_CHECKING

from urllib.parse import urlparse

from ._models import KeyVaultBackupResult
from ._internal import KeyVaultClientBase, parse_folder_url
from ._internal.polling import KeyVaultBackupClientPolling, KeyVaultBackupClientPollingMethod

if TYPE_CHECKING:
    # pylint:disable=unused-import
    from typing import Any
    from azure.core.polling import LROPoller


def _parse_status_url(url):
    parsed = urlparse(url)
    job_id = parsed.path.split("/")[2]
    return job_id


[docs]class KeyVaultBackupClient(KeyVaultClientBase): """Performs Key Vault backup and restore operations. :param str vault_url: URL of the vault on which the client will operate. This is also called the vault's "DNS Name". You should validate that this URL references a valid Key Vault or Managed HSM resource. See https://aka.ms/azsdk/blog/vault-uri for details. :param credential: An object which can provide an access token for the vault, such as a credential from :mod:`azure.identity` :type credential: :class:`~azure.core.credentials.TokenCredential` :keyword api_version: Version of the service API to use. Defaults to the most recent. :paramtype api_version: ~azure.keyvault.administration.ApiVersion :keyword bool verify_challenge_resource: Whether to verify the authentication challenge resource matches the Key Vault or Managed HSM domain. Defaults to True. """ # pylint:disable=protected-access
[docs] def begin_backup(self, blob_storage_url, sas_token, **kwargs): # type: (str, str, **Any) -> LROPoller[KeyVaultBackupResult] """Begin a full backup of the Key Vault. :param str blob_storage_url: URL of the blob storage container in which the backup will be stored, for example https://<account>.blob.core.windows.net/backup :param str sas_token: a Shared Access Signature (SAS) token authorizing access to the blob storage resource :keyword str continuation_token: a continuation token to restart polling from a saved state :returns: An :class:`~azure.core.polling.LROPoller` instance. Call `result()` on this object to wait for the operation to complete and get a :class:`KeyVaultBackupResult`. :rtype: ~azure.core.polling.LROPoller[~azure.keyvault.administration.KeyVaultBackupResult] Example: .. literalinclude:: ../tests/test_examples_administration.py :start-after: [START begin_backup] :end-before: [END begin_backup] :language: python :caption: Create a vault backup :dedent: 8 """ polling_interval = kwargs.pop("_polling_interval", 5) sas_parameter = self._models.SASTokenParameter(storage_resource_uri=blob_storage_url, token=sas_token) continuation_token = kwargs.pop("continuation_token", None) status_response = None if continuation_token: status_url = base64.b64decode(continuation_token.encode()).decode("ascii") try: job_id = _parse_status_url(status_url) except Exception as ex: # pylint: disable=broad-except raise ValueError( "The provided continuation_token is malformed. A valid token can be obtained from the " + "operation poller's continuation_token() method" ) from ex pipeline_response = self._client.full_backup_status( vault_base_url=self._vault_url, job_id=job_id, cls=lambda pipeline_response, _, __: pipeline_response ) if "azure-asyncoperation" not in pipeline_response.http_response.headers: pipeline_response.http_response.headers["azure-asyncoperation"] = status_url status_response = base64.b64encode(pickle.dumps(pipeline_response)).decode("ascii") return self._client.begin_full_backup( vault_base_url=self._vault_url, azure_storage_blob_container_uri=sas_parameter, cls=KeyVaultBackupResult._from_generated, continuation_token=status_response, polling=KeyVaultBackupClientPollingMethod( lro_algorithms=[KeyVaultBackupClientPolling()], timeout=polling_interval, **kwargs ), **kwargs )
[docs] def begin_restore(self, folder_url, sas_token, **kwargs): # type: (str, str, **Any) -> LROPoller """Restore a Key Vault backup. This method restores either a complete Key Vault backup or when ``key_name`` has a value, a single key. :param str folder_url: URL of the blob holding the backup. This would be the `folder_url` of a :class:`KeyVaultBackupResult` returned by :func:`begin_backup`, for example https://<account>.blob.core.windows.net/backup/mhsm-account-2020090117323313 :param str sas_token: a Shared Access Signature (SAS) token authorizing access to the blob storage resource :keyword str continuation_token: a continuation token to restart polling from a saved state :keyword str key_name: name of a single key in the backup. When set, only this key will be restored. :rtype: ~azure.core.polling.LROPoller Examples: .. literalinclude:: ../tests/test_examples_administration.py :start-after: [START begin_restore] :end-before: [END begin_restore] :language: python :caption: Restore a vault backup :dedent: 8 .. literalinclude:: ../tests/test_examples_administration.py :start-after: [START begin_selective_restore] :end-before: [END begin_selective_restore] :language: python :caption: Restore a single key :dedent: 8 """ # LROBasePolling passes its kwargs to pipeline.run(), so we remove unexpected args before constructing it continuation_token = kwargs.pop("continuation_token", None) key_name = kwargs.pop("key_name", None) status_response = None if continuation_token: status_url = base64.b64decode(continuation_token.encode()).decode("ascii") try: job_id = _parse_status_url(status_url) except Exception as ex: # pylint: disable=broad-except raise ValueError( "The provided continuation_token is malformed. A valid token can be obtained from the " + "operation poller's continuation_token() method" ) from ex pipeline_response = self._client.restore_status( vault_base_url=self._vault_url, job_id=job_id, cls=lambda pipeline_response, _, __: pipeline_response ) if "azure-asyncoperation" not in pipeline_response.http_response.headers: pipeline_response.http_response.headers["azure-asyncoperation"] = status_url status_response = base64.b64encode(pickle.dumps(pipeline_response)).decode("ascii") container_url, folder_name = parse_folder_url(folder_url) sas_parameter = self._models.SASTokenParameter(storage_resource_uri=container_url, token=sas_token) polling = KeyVaultBackupClientPollingMethod( lro_algorithms=[KeyVaultBackupClientPolling()], timeout=kwargs.pop("_polling_interval", 5), **kwargs ) if key_name: client_method = functools.partial(self._client.begin_selective_key_restore_operation, key_name=key_name) restore_details = self._models.SelectiveKeyRestoreOperationParameters( sas_token_parameters=sas_parameter, folder=folder_name ) else: client_method = self._client.begin_full_restore_operation restore_details = self._models.RestoreOperationParameters( sas_token_parameters=sas_parameter, folder_to_restore=folder_name ) return client_method( vault_base_url=self._vault_url, restore_blob_details=restore_details, cls=lambda *_: None, # poller.result() returns None continuation_token=status_response, polling=polling, **kwargs )