Source code for azure.mgmt.policyinsights.operations._component_policy_states_operations

# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import datetime
import sys
from typing import Any, Callable, Dict, Optional, TypeVar, Union

from azure.core.exceptions import (
    ClientAuthenticationError,
    HttpResponseError,
    ResourceExistsError,
    ResourceNotFoundError,
    ResourceNotModifiedError,
    map_error,
)
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import HttpResponse
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat

from .. import models as _models
from .._serialization import Serializer
from .._vendor import _convert_request, _format_url_section

if sys.version_info >= (3, 8):
    from typing import Literal  # pylint: disable=no-name-in-module, ungrouped-imports
else:
    from typing_extensions import Literal  # type: ignore  # pylint: disable=ungrouped-imports
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]

_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False


def build_list_query_results_for_subscription_request(
    subscription_id: str,
    component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource],
    *,
    top: Optional[int] = None,
    order_by: Optional[str] = None,
    select: Optional[str] = None,
    from_parameter: Optional[datetime.datetime] = None,
    to: Optional[datetime.datetime] = None,
    filter: Optional[str] = None,
    apply: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "componentPolicyStatesResource": _SERIALIZER.url(
            "component_policy_states_resource", component_policy_states_resource, "str"
        ),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
    if top is not None:
        _params["$top"] = _SERIALIZER.query("top", top, "int", minimum=0)
    if order_by is not None:
        _params["$orderby"] = _SERIALIZER.query("order_by", order_by, "str")
    if select is not None:
        _params["$select"] = _SERIALIZER.query("select", select, "str")
    if from_parameter is not None:
        _params["$from"] = _SERIALIZER.query("from_parameter", from_parameter, "iso-8601")
    if to is not None:
        _params["$to"] = _SERIALIZER.query("to", to, "iso-8601")
    if filter is not None:
        _params["$filter"] = _SERIALIZER.query("filter", filter, "str")
    if apply is not None:
        _params["$apply"] = _SERIALIZER.query("apply", apply, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_query_results_for_resource_group_request(
    subscription_id: str,
    resource_group_name: str,
    component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource],
    *,
    top: Optional[int] = None,
    order_by: Optional[str] = None,
    select: Optional[str] = None,
    from_parameter: Optional[datetime.datetime] = None,
    to: Optional[datetime.datetime] = None,
    filter: Optional[str] = None,
    apply: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
        "componentPolicyStatesResource": _SERIALIZER.url(
            "component_policy_states_resource", component_policy_states_resource, "str"
        ),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
    if top is not None:
        _params["$top"] = _SERIALIZER.query("top", top, "int", minimum=0)
    if order_by is not None:
        _params["$orderby"] = _SERIALIZER.query("order_by", order_by, "str")
    if select is not None:
        _params["$select"] = _SERIALIZER.query("select", select, "str")
    if from_parameter is not None:
        _params["$from"] = _SERIALIZER.query("from_parameter", from_parameter, "iso-8601")
    if to is not None:
        _params["$to"] = _SERIALIZER.query("to", to, "iso-8601")
    if filter is not None:
        _params["$filter"] = _SERIALIZER.query("filter", filter, "str")
    if apply is not None:
        _params["$apply"] = _SERIALIZER.query("apply", apply, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_query_results_for_resource_request(
    resource_id: str,
    component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource],
    *,
    top: Optional[int] = None,
    order_by: Optional[str] = None,
    select: Optional[str] = None,
    from_parameter: Optional[datetime.datetime] = None,
    to: Optional[datetime.datetime] = None,
    filter: Optional[str] = None,
    apply: Optional[str] = None,
    expand: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/{resourceId}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "resourceId": _SERIALIZER.url("resource_id", resource_id, "str", skip_quote=True),
        "componentPolicyStatesResource": _SERIALIZER.url(
            "component_policy_states_resource", component_policy_states_resource, "str"
        ),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
    if top is not None:
        _params["$top"] = _SERIALIZER.query("top", top, "int", minimum=0)
    if order_by is not None:
        _params["$orderby"] = _SERIALIZER.query("order_by", order_by, "str")
    if select is not None:
        _params["$select"] = _SERIALIZER.query("select", select, "str")
    if from_parameter is not None:
        _params["$from"] = _SERIALIZER.query("from_parameter", from_parameter, "iso-8601")
    if to is not None:
        _params["$to"] = _SERIALIZER.query("to", to, "iso-8601")
    if filter is not None:
        _params["$filter"] = _SERIALIZER.query("filter", filter, "str")
    if apply is not None:
        _params["$apply"] = _SERIALIZER.query("apply", apply, "str")
    if expand is not None:
        _params["$expand"] = _SERIALIZER.query("expand", expand, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_query_results_for_policy_definition_request(
    subscription_id: str,
    policy_definition_name: str,
    component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource],
    *,
    top: Optional[int] = None,
    order_by: Optional[str] = None,
    select: Optional[str] = None,
    from_parameter: Optional[datetime.datetime] = None,
    to: Optional[datetime.datetime] = None,
    filter: Optional[str] = None,
    apply: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    authorization_namespace: Literal["Microsoft.Authorization"] = kwargs.pop(
        "authorization_namespace", "Microsoft.Authorization"
    )
    api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/providers/{authorizationNamespace}/policyDefinitions/{policyDefinitionName}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "authorizationNamespace": _SERIALIZER.url("authorization_namespace", authorization_namespace, "str"),
        "policyDefinitionName": _SERIALIZER.url(
            "policy_definition_name",
            policy_definition_name,
            "str",
            max_length=64,
            min_length=1,
            pattern=r"^[^<>%&:\\?/#]*$",
        ),
        "componentPolicyStatesResource": _SERIALIZER.url(
            "component_policy_states_resource", component_policy_states_resource, "str"
        ),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
    if top is not None:
        _params["$top"] = _SERIALIZER.query("top", top, "int", minimum=0)
    if order_by is not None:
        _params["$orderby"] = _SERIALIZER.query("order_by", order_by, "str")
    if select is not None:
        _params["$select"] = _SERIALIZER.query("select", select, "str")
    if from_parameter is not None:
        _params["$from"] = _SERIALIZER.query("from_parameter", from_parameter, "iso-8601")
    if to is not None:
        _params["$to"] = _SERIALIZER.query("to", to, "iso-8601")
    if filter is not None:
        _params["$filter"] = _SERIALIZER.query("filter", filter, "str")
    if apply is not None:
        _params["$apply"] = _SERIALIZER.query("apply", apply, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_query_results_for_subscription_level_policy_assignment_request(
    subscription_id: str,
    policy_assignment_name: str,
    component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource],
    *,
    top: Optional[int] = None,
    order_by: Optional[str] = None,
    select: Optional[str] = None,
    from_parameter: Optional[datetime.datetime] = None,
    to: Optional[datetime.datetime] = None,
    filter: Optional[str] = None,
    apply: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    authorization_namespace: Literal["Microsoft.Authorization"] = kwargs.pop(
        "authorization_namespace", "Microsoft.Authorization"
    )
    api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/providers/{authorizationNamespace}/policyAssignments/{policyAssignmentName}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "authorizationNamespace": _SERIALIZER.url("authorization_namespace", authorization_namespace, "str"),
        "policyAssignmentName": _SERIALIZER.url(
            "policy_assignment_name",
            policy_assignment_name,
            "str",
            max_length=64,
            min_length=1,
            pattern=r"^[^<>%&:\\?/#]*$",
        ),
        "componentPolicyStatesResource": _SERIALIZER.url(
            "component_policy_states_resource", component_policy_states_resource, "str"
        ),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
    if top is not None:
        _params["$top"] = _SERIALIZER.query("top", top, "int", minimum=0)
    if order_by is not None:
        _params["$orderby"] = _SERIALIZER.query("order_by", order_by, "str")
    if select is not None:
        _params["$select"] = _SERIALIZER.query("select", select, "str")
    if from_parameter is not None:
        _params["$from"] = _SERIALIZER.query("from_parameter", from_parameter, "iso-8601")
    if to is not None:
        _params["$to"] = _SERIALIZER.query("to", to, "iso-8601")
    if filter is not None:
        _params["$filter"] = _SERIALIZER.query("filter", filter, "str")
    if apply is not None:
        _params["$apply"] = _SERIALIZER.query("apply", apply, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_query_results_for_resource_group_level_policy_assignment_request(
    subscription_id: str,
    resource_group_name: str,
    policy_assignment_name: str,
    component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource],
    *,
    top: Optional[int] = None,
    order_by: Optional[str] = None,
    select: Optional[str] = None,
    from_parameter: Optional[datetime.datetime] = None,
    to: Optional[datetime.datetime] = None,
    filter: Optional[str] = None,
    apply: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    authorization_namespace: Literal["Microsoft.Authorization"] = kwargs.pop(
        "authorization_namespace", "Microsoft.Authorization"
    )
    api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/{authorizationNamespace}/policyAssignments/{policyAssignmentName}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
        "authorizationNamespace": _SERIALIZER.url("authorization_namespace", authorization_namespace, "str"),
        "policyAssignmentName": _SERIALIZER.url(
            "policy_assignment_name",
            policy_assignment_name,
            "str",
            max_length=64,
            min_length=1,
            pattern=r"^[^<>%&:\\?/#]*$",
        ),
        "componentPolicyStatesResource": _SERIALIZER.url(
            "component_policy_states_resource", component_policy_states_resource, "str"
        ),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
    if top is not None:
        _params["$top"] = _SERIALIZER.query("top", top, "int", minimum=0)
    if order_by is not None:
        _params["$orderby"] = _SERIALIZER.query("order_by", order_by, "str")
    if select is not None:
        _params["$select"] = _SERIALIZER.query("select", select, "str")
    if from_parameter is not None:
        _params["$from"] = _SERIALIZER.query("from_parameter", from_parameter, "iso-8601")
    if to is not None:
        _params["$to"] = _SERIALIZER.query("to", to, "iso-8601")
    if filter is not None:
        _params["$filter"] = _SERIALIZER.query("filter", filter, "str")
    if apply is not None:
        _params["$apply"] = _SERIALIZER.query("apply", apply, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


[docs]class ComponentPolicyStatesOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.mgmt.policyinsights.PolicyInsightsClient`'s :attr:`component_policy_states` attribute. """ models = _models def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def list_query_results_for_subscription( self, subscription_id: str, component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource], top: Optional[int] = None, order_by: Optional[str] = None, select: Optional[str] = None, from_parameter: Optional[datetime.datetime] = None, to: Optional[datetime.datetime] = None, filter: Optional[str] = None, apply: Optional[str] = None, **kwargs: Any ) -> _models.ComponentPolicyStatesQueryResults: """Queries component policy states under subscription scope. :param subscription_id: Microsoft Azure subscription ID. Required. :type subscription_id: str :param component_policy_states_resource: The virtual resource under ComponentPolicyStates resource type. In a given time range, 'latest' represents the latest component policy state(s). "latest" Required. :type component_policy_states_resource: str or ~azure.mgmt.policyinsights.models.ComponentPolicyStatesResource :param top: Maximum number of records to return. Default value is None. :type top: int :param order_by: Ordering expression using OData notation. One or more comma-separated column names with an optional "desc" (the default) or "asc", e.g. "$orderby=PolicyAssignmentId, ResourceId asc". Default value is None. :type order_by: str :param select: Select expression using OData notation. Limits the columns on each record to just those requested, e.g. "$select=PolicyAssignmentId, ResourceId". Default value is None. :type select: str :param from_parameter: ISO 8601 formatted timestamp specifying the start time of the interval to query. When not specified, the service uses ($to - 1-day). Default value is None. :type from_parameter: ~datetime.datetime :param to: ISO 8601 formatted timestamp specifying the end time of the interval to query. When not specified, the service uses request time. Default value is None. :type to: ~datetime.datetime :param filter: OData filter expression. Default value is None. :type filter: str :param apply: OData apply expression for aggregations. Default value is None. :type apply: str :keyword callable cls: A custom type or function that will be passed the direct response :return: ComponentPolicyStatesQueryResults or the result of cls(response) :rtype: ~azure.mgmt.policyinsights.models.ComponentPolicyStatesQueryResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01")) cls: ClsType[_models.ComponentPolicyStatesQueryResults] = kwargs.pop("cls", None) request = build_list_query_results_for_subscription_request( subscription_id=subscription_id, component_policy_states_resource=component_policy_states_resource, top=top, order_by=order_by, select=select, from_parameter=from_parameter, to=to, filter=filter, apply=apply, api_version=api_version, template_url=self.list_query_results_for_subscription.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponseAutoGenerated, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("ComponentPolicyStatesQueryResults", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
list_query_results_for_subscription.metadata = { "url": "/subscriptions/{subscriptionId}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults" }
[docs] @distributed_trace def list_query_results_for_resource_group( self, subscription_id: str, resource_group_name: str, component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource], top: Optional[int] = None, order_by: Optional[str] = None, select: Optional[str] = None, from_parameter: Optional[datetime.datetime] = None, to: Optional[datetime.datetime] = None, filter: Optional[str] = None, apply: Optional[str] = None, **kwargs: Any ) -> _models.ComponentPolicyStatesQueryResults: """Queries component policy states under resource group scope. :param subscription_id: Microsoft Azure subscription ID. Required. :type subscription_id: str :param resource_group_name: Resource group name. Required. :type resource_group_name: str :param component_policy_states_resource: The virtual resource under ComponentPolicyStates resource type. In a given time range, 'latest' represents the latest component policy state(s). "latest" Required. :type component_policy_states_resource: str or ~azure.mgmt.policyinsights.models.ComponentPolicyStatesResource :param top: Maximum number of records to return. Default value is None. :type top: int :param order_by: Ordering expression using OData notation. One or more comma-separated column names with an optional "desc" (the default) or "asc", e.g. "$orderby=PolicyAssignmentId, ResourceId asc". Default value is None. :type order_by: str :param select: Select expression using OData notation. Limits the columns on each record to just those requested, e.g. "$select=PolicyAssignmentId, ResourceId". Default value is None. :type select: str :param from_parameter: ISO 8601 formatted timestamp specifying the start time of the interval to query. When not specified, the service uses ($to - 1-day). Default value is None. :type from_parameter: ~datetime.datetime :param to: ISO 8601 formatted timestamp specifying the end time of the interval to query. When not specified, the service uses request time. Default value is None. :type to: ~datetime.datetime :param filter: OData filter expression. Default value is None. :type filter: str :param apply: OData apply expression for aggregations. Default value is None. :type apply: str :keyword callable cls: A custom type or function that will be passed the direct response :return: ComponentPolicyStatesQueryResults or the result of cls(response) :rtype: ~azure.mgmt.policyinsights.models.ComponentPolicyStatesQueryResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01")) cls: ClsType[_models.ComponentPolicyStatesQueryResults] = kwargs.pop("cls", None) request = build_list_query_results_for_resource_group_request( subscription_id=subscription_id, resource_group_name=resource_group_name, component_policy_states_resource=component_policy_states_resource, top=top, order_by=order_by, select=select, from_parameter=from_parameter, to=to, filter=filter, apply=apply, api_version=api_version, template_url=self.list_query_results_for_resource_group.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponseAutoGenerated, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("ComponentPolicyStatesQueryResults", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
list_query_results_for_resource_group.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults" }
[docs] @distributed_trace def list_query_results_for_resource( self, resource_id: str, component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource], top: Optional[int] = None, order_by: Optional[str] = None, select: Optional[str] = None, from_parameter: Optional[datetime.datetime] = None, to: Optional[datetime.datetime] = None, filter: Optional[str] = None, apply: Optional[str] = None, expand: Optional[str] = None, **kwargs: Any ) -> _models.ComponentPolicyStatesQueryResults: """Queries component policy states for the resource. :param resource_id: Resource ID. Required. :type resource_id: str :param component_policy_states_resource: The virtual resource under ComponentPolicyStates resource type. In a given time range, 'latest' represents the latest component policy state(s). "latest" Required. :type component_policy_states_resource: str or ~azure.mgmt.policyinsights.models.ComponentPolicyStatesResource :param top: Maximum number of records to return. Default value is None. :type top: int :param order_by: Ordering expression using OData notation. One or more comma-separated column names with an optional "desc" (the default) or "asc", e.g. "$orderby=PolicyAssignmentId, ResourceId asc". Default value is None. :type order_by: str :param select: Select expression using OData notation. Limits the columns on each record to just those requested, e.g. "$select=PolicyAssignmentId, ResourceId". Default value is None. :type select: str :param from_parameter: ISO 8601 formatted timestamp specifying the start time of the interval to query. When not specified, the service uses ($to - 1-day). Default value is None. :type from_parameter: ~datetime.datetime :param to: ISO 8601 formatted timestamp specifying the end time of the interval to query. When not specified, the service uses request time. Default value is None. :type to: ~datetime.datetime :param filter: OData filter expression. Default value is None. :type filter: str :param apply: OData apply expression for aggregations. Default value is None. :type apply: str :param expand: The $expand query parameter. Default value is None. :type expand: str :keyword callable cls: A custom type or function that will be passed the direct response :return: ComponentPolicyStatesQueryResults or the result of cls(response) :rtype: ~azure.mgmt.policyinsights.models.ComponentPolicyStatesQueryResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01")) cls: ClsType[_models.ComponentPolicyStatesQueryResults] = kwargs.pop("cls", None) request = build_list_query_results_for_resource_request( resource_id=resource_id, component_policy_states_resource=component_policy_states_resource, top=top, order_by=order_by, select=select, from_parameter=from_parameter, to=to, filter=filter, apply=apply, expand=expand, api_version=api_version, template_url=self.list_query_results_for_resource.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponseAutoGenerated, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("ComponentPolicyStatesQueryResults", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
list_query_results_for_resource.metadata = { "url": "/{resourceId}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults" }
[docs] @distributed_trace def list_query_results_for_policy_definition( self, subscription_id: str, policy_definition_name: str, component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource], top: Optional[int] = None, order_by: Optional[str] = None, select: Optional[str] = None, from_parameter: Optional[datetime.datetime] = None, to: Optional[datetime.datetime] = None, filter: Optional[str] = None, apply: Optional[str] = None, **kwargs: Any ) -> _models.ComponentPolicyStatesQueryResults: """Queries component policy states for the subscription level policy definition. :param subscription_id: Microsoft Azure subscription ID. Required. :type subscription_id: str :param policy_definition_name: Policy definition name. Required. :type policy_definition_name: str :param component_policy_states_resource: The virtual resource under ComponentPolicyStates resource type. In a given time range, 'latest' represents the latest component policy state(s). "latest" Required. :type component_policy_states_resource: str or ~azure.mgmt.policyinsights.models.ComponentPolicyStatesResource :param top: Maximum number of records to return. Default value is None. :type top: int :param order_by: Ordering expression using OData notation. One or more comma-separated column names with an optional "desc" (the default) or "asc", e.g. "$orderby=PolicyAssignmentId, ResourceId asc". Default value is None. :type order_by: str :param select: Select expression using OData notation. Limits the columns on each record to just those requested, e.g. "$select=PolicyAssignmentId, ResourceId". Default value is None. :type select: str :param from_parameter: ISO 8601 formatted timestamp specifying the start time of the interval to query. When not specified, the service uses ($to - 1-day). Default value is None. :type from_parameter: ~datetime.datetime :param to: ISO 8601 formatted timestamp specifying the end time of the interval to query. When not specified, the service uses request time. Default value is None. :type to: ~datetime.datetime :param filter: OData filter expression. Default value is None. :type filter: str :param apply: OData apply expression for aggregations. Default value is None. :type apply: str :keyword authorization_namespace: The namespace for Microsoft Authorization resource provider; only "Microsoft.Authorization" is allowed. Default value is "Microsoft.Authorization". Note that overriding this default value may result in unsupported behavior. :paramtype authorization_namespace: str :keyword callable cls: A custom type or function that will be passed the direct response :return: ComponentPolicyStatesQueryResults or the result of cls(response) :rtype: ~azure.mgmt.policyinsights.models.ComponentPolicyStatesQueryResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) authorization_namespace: Literal["Microsoft.Authorization"] = kwargs.pop( "authorization_namespace", "Microsoft.Authorization" ) api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01")) cls: ClsType[_models.ComponentPolicyStatesQueryResults] = kwargs.pop("cls", None) request = build_list_query_results_for_policy_definition_request( subscription_id=subscription_id, policy_definition_name=policy_definition_name, component_policy_states_resource=component_policy_states_resource, top=top, order_by=order_by, select=select, from_parameter=from_parameter, to=to, filter=filter, apply=apply, authorization_namespace=authorization_namespace, api_version=api_version, template_url=self.list_query_results_for_policy_definition.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponseAutoGenerated, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("ComponentPolicyStatesQueryResults", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
list_query_results_for_policy_definition.metadata = { "url": "/subscriptions/{subscriptionId}/providers/{authorizationNamespace}/policyDefinitions/{policyDefinitionName}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults" }
[docs] @distributed_trace def list_query_results_for_subscription_level_policy_assignment( self, subscription_id: str, policy_assignment_name: str, component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource], top: Optional[int] = None, order_by: Optional[str] = None, select: Optional[str] = None, from_parameter: Optional[datetime.datetime] = None, to: Optional[datetime.datetime] = None, filter: Optional[str] = None, apply: Optional[str] = None, **kwargs: Any ) -> _models.ComponentPolicyStatesQueryResults: """Queries component policy states for the subscription level policy assignment. :param subscription_id: Microsoft Azure subscription ID. Required. :type subscription_id: str :param policy_assignment_name: Policy assignment name. Required. :type policy_assignment_name: str :param component_policy_states_resource: The virtual resource under ComponentPolicyStates resource type. In a given time range, 'latest' represents the latest component policy state(s). "latest" Required. :type component_policy_states_resource: str or ~azure.mgmt.policyinsights.models.ComponentPolicyStatesResource :param top: Maximum number of records to return. Default value is None. :type top: int :param order_by: Ordering expression using OData notation. One or more comma-separated column names with an optional "desc" (the default) or "asc", e.g. "$orderby=PolicyAssignmentId, ResourceId asc". Default value is None. :type order_by: str :param select: Select expression using OData notation. Limits the columns on each record to just those requested, e.g. "$select=PolicyAssignmentId, ResourceId". Default value is None. :type select: str :param from_parameter: ISO 8601 formatted timestamp specifying the start time of the interval to query. When not specified, the service uses ($to - 1-day). Default value is None. :type from_parameter: ~datetime.datetime :param to: ISO 8601 formatted timestamp specifying the end time of the interval to query. When not specified, the service uses request time. Default value is None. :type to: ~datetime.datetime :param filter: OData filter expression. Default value is None. :type filter: str :param apply: OData apply expression for aggregations. Default value is None. :type apply: str :keyword authorization_namespace: The namespace for Microsoft Authorization resource provider; only "Microsoft.Authorization" is allowed. Default value is "Microsoft.Authorization". Note that overriding this default value may result in unsupported behavior. :paramtype authorization_namespace: str :keyword callable cls: A custom type or function that will be passed the direct response :return: ComponentPolicyStatesQueryResults or the result of cls(response) :rtype: ~azure.mgmt.policyinsights.models.ComponentPolicyStatesQueryResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) authorization_namespace: Literal["Microsoft.Authorization"] = kwargs.pop( "authorization_namespace", "Microsoft.Authorization" ) api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01")) cls: ClsType[_models.ComponentPolicyStatesQueryResults] = kwargs.pop("cls", None) request = build_list_query_results_for_subscription_level_policy_assignment_request( subscription_id=subscription_id, policy_assignment_name=policy_assignment_name, component_policy_states_resource=component_policy_states_resource, top=top, order_by=order_by, select=select, from_parameter=from_parameter, to=to, filter=filter, apply=apply, authorization_namespace=authorization_namespace, api_version=api_version, template_url=self.list_query_results_for_subscription_level_policy_assignment.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponseAutoGenerated, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("ComponentPolicyStatesQueryResults", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
list_query_results_for_subscription_level_policy_assignment.metadata = { "url": "/subscriptions/{subscriptionId}/providers/{authorizationNamespace}/policyAssignments/{policyAssignmentName}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults" }
[docs] @distributed_trace def list_query_results_for_resource_group_level_policy_assignment( self, subscription_id: str, resource_group_name: str, policy_assignment_name: str, component_policy_states_resource: Union[str, _models.ComponentPolicyStatesResource], top: Optional[int] = None, order_by: Optional[str] = None, select: Optional[str] = None, from_parameter: Optional[datetime.datetime] = None, to: Optional[datetime.datetime] = None, filter: Optional[str] = None, apply: Optional[str] = None, **kwargs: Any ) -> _models.ComponentPolicyStatesQueryResults: """Queries component policy states for the resource group level policy assignment. :param subscription_id: Microsoft Azure subscription ID. Required. :type subscription_id: str :param resource_group_name: Resource group name. Required. :type resource_group_name: str :param policy_assignment_name: Policy assignment name. Required. :type policy_assignment_name: str :param component_policy_states_resource: The virtual resource under ComponentPolicyStates resource type. In a given time range, 'latest' represents the latest component policy state(s). "latest" Required. :type component_policy_states_resource: str or ~azure.mgmt.policyinsights.models.ComponentPolicyStatesResource :param top: Maximum number of records to return. Default value is None. :type top: int :param order_by: Ordering expression using OData notation. One or more comma-separated column names with an optional "desc" (the default) or "asc", e.g. "$orderby=PolicyAssignmentId, ResourceId asc". Default value is None. :type order_by: str :param select: Select expression using OData notation. Limits the columns on each record to just those requested, e.g. "$select=PolicyAssignmentId, ResourceId". Default value is None. :type select: str :param from_parameter: ISO 8601 formatted timestamp specifying the start time of the interval to query. When not specified, the service uses ($to - 1-day). Default value is None. :type from_parameter: ~datetime.datetime :param to: ISO 8601 formatted timestamp specifying the end time of the interval to query. When not specified, the service uses request time. Default value is None. :type to: ~datetime.datetime :param filter: OData filter expression. Default value is None. :type filter: str :param apply: OData apply expression for aggregations. Default value is None. :type apply: str :keyword authorization_namespace: The namespace for Microsoft Authorization resource provider; only "Microsoft.Authorization" is allowed. Default value is "Microsoft.Authorization". Note that overriding this default value may result in unsupported behavior. :paramtype authorization_namespace: str :keyword callable cls: A custom type or function that will be passed the direct response :return: ComponentPolicyStatesQueryResults or the result of cls(response) :rtype: ~azure.mgmt.policyinsights.models.ComponentPolicyStatesQueryResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) authorization_namespace: Literal["Microsoft.Authorization"] = kwargs.pop( "authorization_namespace", "Microsoft.Authorization" ) api_version: Literal["2022-04-01"] = kwargs.pop("api_version", _params.pop("api-version", "2022-04-01")) cls: ClsType[_models.ComponentPolicyStatesQueryResults] = kwargs.pop("cls", None) request = build_list_query_results_for_resource_group_level_policy_assignment_request( subscription_id=subscription_id, resource_group_name=resource_group_name, policy_assignment_name=policy_assignment_name, component_policy_states_resource=component_policy_states_resource, top=top, order_by=order_by, select=select, from_parameter=from_parameter, to=to, filter=filter, apply=apply, authorization_namespace=authorization_namespace, api_version=api_version, template_url=self.list_query_results_for_resource_group_level_policy_assignment.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponseAutoGenerated, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("ComponentPolicyStatesQueryResults", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
list_query_results_for_resource_group_level_policy_assignment.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/{authorizationNamespace}/policyAssignments/{policyAssignmentName}/providers/Microsoft.PolicyInsights/componentPolicyStates/{componentPolicyStatesResource}/queryResults" }