Source code for azure.containerregistry.aio._async_container_registry_client

# coding=utf-8
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from typing import Any, Dict, TYPE_CHECKING

from azure.core.async_paging import AsyncItemPaged, AsyncList
from azure.core.exceptions import (
    ClientAuthenticationError,
    ResourceExistsError,
    ResourceNotFoundError,
    HttpResponseError,
    map_error,
)
from azure.core.pipeline import AsyncPipeline
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async

from ._async_base_client import ContainerRegistryBaseClient, AsyncTransportWrapper
from ._async_container_repository_client import ContainerRepositoryClient
from .._generated.models import AcrErrors
from .._helpers import _parse_next_link
from .._models import RepositoryProperties, DeletedRepositoryResult

if TYPE_CHECKING:
    from azure.core.credentials_async import AsyncTokenCredential


[docs]class ContainerRegistryClient(ContainerRegistryBaseClient): def __init__(self, endpoint: str, credential: "AsyncTokenCredential", **kwargs: Dict[str, Any]) -> None: """Create a ContainerRegistryClient from an endpoint and a credential :param endpoint: An ACR endpoint :type endpoint: str :param credential: The credential with which to authenticate :type credential: :class:`~azure.core.credentials_async.AsyncTokenCredential` :returns: None :raises: None """ if not endpoint.startswith("https://") and not endpoint.startswith("http://"): endpoint = "https://" + endpoint self._endpoint = endpoint self._credential = credential super(ContainerRegistryClient, self).__init__(endpoint=endpoint, credential=credential, **kwargs)
[docs] @distributed_trace_async async def delete_repository(self, repository: str, **kwargs: Dict[str, Any]) -> DeletedRepositoryResult: """Delete a repository :param repository: The repository to delete :type repository: str :returns: Object containing information about the deleted repository :rtype: :class:`~azure.containerregistry.DeletedRepositoryResult` :raises: :class:`~azure.core.exceptions.ResourceNotFoundError` """ result = await self._client.container_registry.delete_repository(repository, **kwargs) return DeletedRepositoryResult._from_generated(result) # pylint: disable=protected-access
[docs] @distributed_trace def list_repositories(self, **kwargs: Dict[str, Any]) -> AsyncItemPaged[str]: """List all repositories :keyword last: Query parameter for the last item in the previous call. Ensuing call will return values after last lexicallyy :paramtype last: str :keyword max: Maximum number of repositories to return :paramtype max: int :keyword results_per_page: Number of repositories to return per page :paramtype results_per_page: int :return: ItemPaged[str] :rtype: :class:`~azure.core.async_paging.AsyncItemPaged` :raises: :class:`~azure.core.exceptions.ResourceNotFoundError` """ n = kwargs.pop("results_per_page", None) last = kwargs.pop("last", None) cls = kwargs.pop("cls", None) error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {})) accept = "application/json" def prepare_request(next_link=None): # Construct headers header_parameters = {} # type: Dict[str, Any] header_parameters["Accept"] = self._client._serialize.header( # pylint: disable=protected-access "accept", accept, "str" ) if not next_link: # Construct URL url = "/acr/v1/_catalog" path_format_arguments = { "url": self._client._serialize.url( # pylint: disable=protected-access "self._config.url", self._client._config.url, # pylint: disable=protected-access "str", skip_quote=True, ), } url = self._client._client.format_url(url, **path_format_arguments) # pylint: disable=protected-access # Construct parameters query_parameters = {} # type: Dict[str, Any] if last is not None: query_parameters["last"] = self._client._serialize.query( # pylint: disable=protected-access "last", last, "str" ) if n is not None: query_parameters["n"] = self._client._serialize.query( # pylint: disable=protected-access "n", n, "int" ) request = self._client._client.get( # pylint: disable=protected-access url, query_parameters, header_parameters ) else: url = next_link query_parameters = {} # type: Dict[str, Any] path_format_arguments = { "url": self._client._serialize.url( # pylint: disable=protected-access "self._config.url", self._client._config.url, # pylint: disable=protected-access "str", skip_quote=True, ), } url = self._client._client.format_url(url, **path_format_arguments) # pylint: disable=protected-access request = self._client._client.get( # pylint: disable=protected-access url, query_parameters, header_parameters ) return request async def extract_data(pipeline_response): deserialized = self._client._deserialize( # pylint: disable=protected-access "Repositories", pipeline_response ) list_of_elem = deserialized.repositories if cls: list_of_elem = cls(list_of_elem) link = None if "Link" in pipeline_response.http_response.headers.keys(): link = _parse_next_link(pipeline_response.http_response.headers["Link"]) return link, AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: error = self._client._deserialize.failsafe_deserialize( # pylint: disable=protected-access AcrErrors, response ) map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, model=error) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace def get_repository_client(self, repository: str, **kwargs: Dict[str, Any]) -> ContainerRepositoryClient: """Get a repository client :param repository: The repository to create a client for :type repository: str :returns: :class:`~azure.containerregistry.aio.ContainerRepositoryClient` """ _pipeline = AsyncPipeline( transport=AsyncTransportWrapper( self._client._client._pipeline._transport # pylint: disable=protected-access ), policies=self._client._client._pipeline._impl_policies, # pylint: disable=protected-access ) return ContainerRepositoryClient( self._endpoint, repository, credential=self._credential, pipeline=_pipeline, **kwargs )