Source code for azure.appconfiguration.aio._azure_configuration_client_async

# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
from requests.structures import CaseInsensitiveDict
from azure.core import MatchConditions
from azure.core.pipeline import AsyncPipeline
from azure.core.pipeline.policies import (
    UserAgentPolicy,
    DistributedTracingPolicy,
    HttpLoggingPolicy
)
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.pipeline.transport import AsyncioRequestsTransport
from azure.core.exceptions import (
    HttpResponseError,
    ClientAuthenticationError,
    ResourceExistsError,
    ResourceModifiedError,
    ResourceNotFoundError,
    ResourceNotModifiedError,
)
from .._azure_appconfiguration_error import ResourceReadOnlyError
from .._utils import (
    get_endpoint_from_connection_string,
    escape_and_tostr,
    prep_if_match,
    prep_if_none_match,
)
from .._generated.aio import AzureAppConfiguration
from .._generated.models import ErrorException
from .._generated.aio._configuration_async import AzureAppConfigurationConfiguration
from .._azure_appconfiguration_requests import AppConfigRequestsCredentialsPolicy
from .._azure_appconfiguration_credential import AppConfigConnectionStringCredential
from .._generated.models import KeyValue
from .._models import ConfigurationSetting
from .._user_agent import USER_AGENT


[docs]class AzureAppConfigurationClient: """Represents an client that calls restful API of Azure App Configuration service. :param str base_url: base url of the service :param credential: An object which can provide secrets for the app configuration service :type credential: azure.AppConfigConnectionStringCredential :keyword Pipeline pipeline: If omitted, the standard pipeline is used. :keyword HttpTransport transport: If omitted, the standard pipeline is used. :keyword list[HTTPPolicy] policies: If omitted, the standard pipeline is used. This is the async version of :class:`azure.appconfiguration.AzureAppConfigurationClient` """ # pylint:disable=protected-access def __init__(self, base_url, credential, **kwargs): # type: (str, AppConfigConnectionStringCredential, dict) -> None self._config = AzureAppConfigurationConfiguration(credential, **kwargs) self._config.user_agent_policy = UserAgentPolicy( base_user_agent=USER_AGENT, **kwargs ) pipeline = kwargs.get("pipeline") if pipeline is None: pipeline = self._create_appconfig_pipeline(**kwargs) self._impl = AzureAppConfiguration( credentials=credential, base_url=base_url, pipeline=pipeline )
[docs] @classmethod def from_connection_string( cls, connection_string, **kwargs ): # type: (string, dict) -> AzureAppConfigurationClient """Create AzureAppConfigurationClient from a Connection String. :param connection_string: Connection String (one of the access keys of the Azure App Configuration resource) used to access the Azure App Configuration. :type connection_string: str This is the async version of :class:`azure.appconfiguration.AzureAppConfigurationClient` Example .. code-block:: python from azure.appconfiguration.aio import AzureAppConfigurationClient connection_str = "<my connection string>" async_client = AzureAppConfigurationClient.from_connection_string(connection_str) """ base_url = "https://" + get_endpoint_from_connection_string(connection_string) return cls( credential=AppConfigConnectionStringCredential(connection_string), base_url=base_url, **kwargs )
def _create_appconfig_pipeline(self, **kwargs): transport = kwargs.get('transport') policies = kwargs.get('policies') if policies is None: # [] is a valid policy list policies = [ self._config.headers_policy, self._config.user_agent_policy, AppConfigRequestsCredentialsPolicy(self._config.credentials), self._config.retry_policy, self._config.logging_policy, # HTTP request/response log DistributedTracingPolicy(**kwargs), HttpLoggingPolicy(**kwargs), ] if not transport: transport = AsyncioRequestsTransport(**kwargs) return AsyncPipeline( transport, policies, )
[docs] @distributed_trace def list_configuration_settings( self, keys=None, labels=None, **kwargs ): # type: (list, list, dict) -> azure.core.paging.ItemPaged[ConfigurationSetting] """List the configuration settings stored in the configuration service, optionally filtered by label and accept_datetime :param keys: filter results based on their keys. '*' can be used as wildcard in the beginning or end of the filter :type keys: list[str] :param labels: filter results based on their label. '*' can be used as wildcard in the beginning or end of the filter :type labels: list[str] :keyword datetime accept_datetime: filter out ConfigurationSetting created after this datetime :keyword list[str] fields: specify which fields to include in the results. Leave None to include all fields :keyword dict headers: if "headers" exists, its value (a dict) will be added to the http request header :return: An iterator of :class:`ConfigurationSetting` :rtype: :class:`azure.core.paging.ItemPaged[ConfigurationSetting]` :raises: :class:`HttpResponseError`, :class:`ClientAuthenticationError` Example .. code-block:: python from datetime import datetime, timedelta accept_datetime = datetime.today() + timedelta(days=-1) all_listed = async_client.list_configuration_settings() async for item in all_listed: pass # do something filtered_listed = async_client.list_configuration_settings( labels=["*Labe*"], keys=["*Ke*"], accept_datetime=accept_datetime ) async for item in filtered_listed: pass # do something """ select = kwargs.pop("fields", None) if select: select = ['locked' if x == 'read_only' else x for x in select] encoded_labels = escape_and_tostr(labels) encoded_keys = escape_and_tostr(keys) error_map = { 401: ClientAuthenticationError } try: return self._impl.get_key_values( label=encoded_labels, key=encoded_keys, select=select, cls=lambda objs: [ConfigurationSetting._from_key_value(x) for x in objs], error_map=error_map, **kwargs ) except ErrorException as error: raise HttpResponseError(message=error.message, response=error.response)
[docs] @distributed_trace_async async def get_configuration_setting( self, key, label=None, etag='*', match_condition=MatchConditions.Unconditionally, **kwargs): # type: (str, Optional[str], Optional[str], Optional[MatchConditions], dict) -> ConfigurationSetting """Get the matched ConfigurationSetting from Azure App Configuration service :param key: key of the ConfigurationSetting :type key: str :param label: label of the ConfigurationSetting :type label: str :param etag: check if the ConfigurationSetting is changed. Set None to skip checking etag :type etag: str or None :param ~azure.core.MatchConditions match_condition: the match condition to use upon the etag :keyword datetime accept_datetime: the retrieved ConfigurationSetting that created no later than this datetime :keyword dict headers: if "headers" exists, its value (a dict) will be added to the http request header :return: The matched ConfigurationSetting object :rtype: :class:`ConfigurationSetting` :raises: :class:`HttpResponseError`, :class:`ClientAuthenticationError`, \ :class:`ResourceNotFoundError`, :class:`ResourceModifiedError`, :class:`ResourceExistsError` Example .. code-block:: python # in async function fetched_config_setting = await async_client.get_configuration_setting( key="MyKey", label="MyLabel" ) """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError } if match_condition == MatchConditions.IfNotModified: error_map[412] = ResourceModifiedError if match_condition == MatchConditions.IfModified: error_map[304] = ResourceNotModifiedError if match_condition == MatchConditions.IfPresent: error_map[412] = ResourceNotFoundError if match_condition == MatchConditions.IfMissing: error_map[412] = ResourceExistsError try: key_value = await self._impl.get_key_value( key=key, label=label, if_match=prep_if_match(etag, match_condition), if_none_match=prep_if_none_match(etag, match_condition), error_map=error_map, **kwargs ) return ConfigurationSetting._from_key_value(key_value) except ResourceNotModifiedError: return None except ErrorException as error: raise HttpResponseError(message=error.message, response=error.response)
[docs] @distributed_trace_async async def add_configuration_setting(self, configuration_setting, **kwargs): # type: (ConfigurationSetting, dict) -> ConfigurationSetting """Add a ConfigurationSetting into the Azure App Configuration service. :param configuration_setting: the ConfigurationSetting object to be added :type configuration_setting: :class:`ConfigurationSetting<azure.appconfiguration.ConfigurationSetting>` :keyword dict headers: if "headers" exists, its value (a dict) will be added to the http request header :return: The ConfigurationSetting object returned from the App Configuration service :rtype: :class:`ConfigurationSetting` :raises: :class:`HttpResponseError`, :class:`ClientAuthenticationError`, :class:`ResourceExistsError` Example .. code-block:: python # in async fuction config_setting = ConfigurationSetting( key="MyKey", label="MyLabel", value="my value", content_type="my content type", tags={"my tag": "my tag value"} ) added_config_setting = await async_client.add_configuration_setting(config_setting) """ key_value = KeyValue( key=configuration_setting.key, label=configuration_setting.label, content_type=configuration_setting.content_type, value=configuration_setting.value, tags=configuration_setting.tags ) custom_headers = CaseInsensitiveDict(kwargs.get("headers")) error_map = { 401: ClientAuthenticationError, 412: ResourceExistsError } try: key_value_added = await self._impl.put_key_value( entity=key_value, key=key_value.key, label=key_value.label, if_none_match="*", headers=custom_headers, error_map=error_map, ) return ConfigurationSetting._from_key_value(key_value_added) except ErrorException as error: raise HttpResponseError(message=error.message, response=error.response)
[docs] @distributed_trace_async async def set_configuration_setting( self, configuration_setting, match_condition=MatchConditions.Unconditionally, **kwargs ): # type: (ConfigurationSetting, Optional[MatchConditions], dict) -> ConfigurationSetting """Add or update a ConfigurationSetting. If the configuration setting identified by key and label does not exist, this is a create. Otherwise this is an update. :param configuration_setting: the ConfigurationSetting to be added (if not exists) \ or updated (if exists) to the service :type configuration_setting: :class:`ConfigurationSetting` :param ~azure.core.MatchConditions match_condition: the match condition to use upon the etag :keyword dict headers: if "headers" exists, its value (a dict) will be added to the http request header :return: The ConfigurationSetting returned from the service :rtype: :class:`ConfigurationSetting` :raises: :class:`HttpResponseError`, :class:`ClientAuthenticationError`, \ :class:`ResourceReadOnlyError`, :class:`ResourceModifiedError`, :class:`ResourceNotModifiedError`, \ :class:`ResourceNotFoundError`, :class:`ResourceExistsError` Example .. code-block:: python # in async function config_setting = ConfigurationSetting( key="MyKey", label="MyLabel", value="my set value", content_type="my set content type", tags={"my set tag": "my set tag value"} ) returned_config_setting = await async_client.set_configuration_setting(config_setting) """ key_value = KeyValue( key=configuration_setting.key, label=configuration_setting.label, content_type=configuration_setting.content_type, value=configuration_setting.value, tags=configuration_setting.tags ) custom_headers = CaseInsensitiveDict(kwargs.get("headers")) error_map = { 401: ClientAuthenticationError, 409: ResourceReadOnlyError } if match_condition == MatchConditions.IfNotModified: error_map[412] = ResourceModifiedError if match_condition == MatchConditions.IfModified: error_map[412] = ResourceNotModifiedError if match_condition == MatchConditions.IfPresent: error_map[412] = ResourceNotFoundError if match_condition == MatchConditions.IfMissing: error_map[412] = ResourceExistsError try: key_value_set = await self._impl.put_key_value( entity=key_value, key=key_value.key, label=key_value.label, if_match=prep_if_match(configuration_setting.etag, match_condition), if_none_match=prep_if_none_match(configuration_setting.etag, match_condition), headers=custom_headers, error_map=error_map, ) return ConfigurationSetting._from_key_value(key_value_set) except ErrorException as error: raise HttpResponseError(message=error.message, response=error.response)
[docs] @distributed_trace_async async def delete_configuration_setting( self, key, label=None, **kwargs ): # type: (str, Optional[str], dict) -> ConfigurationSetting """Delete a ConfigurationSetting if it exists :param key: key used to identify the ConfigurationSetting :type key: str :param label: label used to identify the ConfigurationSetting :type label: str :keyword str etag: check if the ConfigurationSetting is changed. Set None to skip checking etag :keyword ~azure.core.MatchConditions match_condition: the match condition to use upon the etag :keyword dict headers: if "headers" exists, its value (a dict) will be added to the http request :return: The deleted ConfigurationSetting returned from the service, or None if it doesn't exist. :rtype: :class:`ConfigurationSetting` :raises: :class:`HttpResponseError`, :class:`ClientAuthenticationError`, \ :class:`ResourceReadOnlyError`, :class:`ResourceModifiedError`, :class:`ResourceNotModifiedError`, \ :class:`ResourceNotFoundError`, :class:`ResourceExistsError` Example .. code-block:: python # in async function deleted_config_setting = await async_client.delete_configuration_setting( key="MyKey", label="MyLabel" ) """ etag = kwargs.pop("etag", None) match_condition = kwargs.pop("match_condition", MatchConditions.Unconditionally) custom_headers = CaseInsensitiveDict(kwargs.get("headers")) error_map = { 401: ClientAuthenticationError, 409: ResourceReadOnlyError } if match_condition == MatchConditions.IfNotModified: error_map[412] = ResourceModifiedError if match_condition == MatchConditions.IfModified: error_map[412] = ResourceNotModifiedError if match_condition == MatchConditions.IfPresent: error_map[412] = ResourceNotFoundError if match_condition == MatchConditions.IfMissing: error_map[412] = ResourceExistsError try: key_value_deleted = await self._impl.delete_key_value( key=key, label=label, if_match=prep_if_match(etag, match_condition), headers=custom_headers, error_map=error_map, ) return ConfigurationSetting._from_key_value(key_value_deleted) except ErrorException as error: raise HttpResponseError(message=error.message, response=error.response)
[docs] @distributed_trace def list_revisions( self, keys=None, labels=None, **kwargs ): # type: (Optional[list], Optional[list], dict) -> azure.core.paging.AsyncItemPaged[ConfigurationSetting] """ Find the ConfigurationSetting revision history. :param keys: filter results based on their keys. '*' can be used as wildcard in the beginning or end of the filter :type keys: list[str] :param labels: filter results based on their label. '*' can be used as wildcard in the beginning or end of the filter :type labels: list[str] :keyword datetime accept_datetime: filter out ConfigurationSetting created after this datetime :keyword list[str] fields: specify which fields to include in the results. Leave None to include all fields :keyword dict headers: if "headers" exists, its value (a dict) will be added to the http request header :return: An iterator of :class:`ConfigurationSetting` :rtype: :class:`azure.core.paging.ItemPaged[ConfigurationSetting]` :raises: :class:`HttpResponseError`, :class:`ClientAuthenticationError` Example .. code-block:: python # in async function from datetime import datetime, timedelta accept_datetime = datetime.today() + timedelta(days=-1) all_revisions = async_client.list_revisions() async for item in all_revisions: pass # do something filtered_revisions = async_client.list_revisions( labels=["*Labe*"], keys=["*Ke*"], accept_datetime=accept_datetime ) async for item in filtered_revisions: pass # do something """ select = kwargs.pop("fields", None) if select: select = ['locked' if x == 'read_only' else x for x in select] encoded_labels = escape_and_tostr(labels) encoded_keys = escape_and_tostr(keys) error_map = { 401: ClientAuthenticationError } try: return self._impl.get_revisions( label=encoded_labels, key=encoded_keys, select=select, cls=lambda objs: [ConfigurationSetting._from_key_value(x) for x in objs], error_map=error_map, **kwargs ) except ErrorException as error: raise HttpResponseError(message=error.message, response=error.response)
[docs] @distributed_trace async def set_read_only( self, configuration_setting, **kwargs ): # type: (ConfigurationSetting, dict) -> ConfigurationSetting """Set a configuration setting read only :param configuration_setting: the ConfigurationSetting to be set read only :type configuration_setting: :class:`ConfigurationSetting` :keyword dict headers: if "headers" exists, its value (a dict) will be added to the http request header :return: The ConfigurationSetting returned from the service :rtype: :class:`ConfigurationSetting` :raises: :class:`HttpResponseError`, :class:`ClientAuthenticationError`, :class:`ResourceNotFoundError` Example .. code-block:: python config_setting = await async_client.get_configuration_setting( key="MyKey", label="MyLabel" ) read_only_config_setting = await async_client.set_read_only(config_setting) """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError } try: key_value = await self._impl.put_lock( key=configuration_setting.key, label=configuration_setting.label, error_map=error_map, **kwargs ) return ConfigurationSetting._from_key_value(key_value) except ErrorException as error: raise HttpResponseError(message=error.message, response=error.response)
[docs] @distributed_trace async def clear_read_only( self, configuration_setting, **kwargs ): # type: (ConfigurationSetting, dict) -> ConfigurationSetting """Clear read only flag for a configuration setting :param configuration_setting: the ConfigurationSetting to be read only clear :type configuration_setting: :class:`ConfigurationSetting` :keyword dict headers: if "headers" exists, its value (a dict) will be added to the http request header :return: The ConfigurationSetting returned from the service :rtype: :class:`ConfigurationSetting` :raises: :class:`HttpResponseError`, :class:`ClientAuthenticationError`, :class:`ResourceNotFoundError` Example .. code-block:: python config_setting = await async_client.get_configuration_setting( key="MyKey", label="MyLabel" ) read_only_config_setting = await async_client.clear_read_only(config_setting) """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError } try: key_value = await self._impl.delete_lock( key=configuration_setting.key, label=configuration_setting.label, error_map=error_map, **kwargs ) return ConfigurationSetting._from_key_value(key_value) except ErrorException as error: raise HttpResponseError(message=error.message, response=error.response)