Source code for azure.purview.scanning.operations._operations

# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import functools
from json import loads as _loads
from typing import TYPE_CHECKING
import warnings

from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import HttpResponse
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from msrest import Serializer

from .._vendor import _convert_request, _format_url_section

if TYPE_CHECKING:
    # pylint: disable=unused-import,ungrouped-imports
    from typing import Any, Callable, Dict, Generic, Iterable, Optional, TypeVar, Union

    T = TypeVar('T')
    ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]

_SERIALIZER = Serializer()
# fmt: off

def build_key_vault_connections_get_request(
    key_vault_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/azureKeyVaults/{keyVaultName}')
    path_format_arguments = {
        "keyVaultName": _SERIALIZER.url("key_vault_name", key_vault_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_key_vault_connections_create_request(
    key_vault_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    content_type = kwargs.pop('content_type', None)  # type: Optional[str]

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/azureKeyVaults/{keyVaultName}')
    path_format_arguments = {
        "keyVaultName": _SERIALIZER.url("key_vault_name", key_vault_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    if content_type is not None:
        header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="PUT",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_key_vault_connections_delete_request(
    key_vault_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/azureKeyVaults/{keyVaultName}')
    path_format_arguments = {
        "keyVaultName": _SERIALIZER.url("key_vault_name", key_vault_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="DELETE",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_key_vault_connections_list_all_request(
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/azureKeyVaults')

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_classification_rules_get_request(
    classification_rule_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/classificationrules/{classificationRuleName}')
    path_format_arguments = {
        "classificationRuleName": _SERIALIZER.url("classification_rule_name", classification_rule_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_classification_rules_create_or_update_request(
    classification_rule_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    content_type = kwargs.pop('content_type', None)  # type: Optional[str]

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/classificationrules/{classificationRuleName}')
    path_format_arguments = {
        "classificationRuleName": _SERIALIZER.url("classification_rule_name", classification_rule_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    if content_type is not None:
        header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="PUT",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_classification_rules_delete_request(
    classification_rule_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/classificationrules/{classificationRuleName}')
    path_format_arguments = {
        "classificationRuleName": _SERIALIZER.url("classification_rule_name", classification_rule_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="DELETE",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_classification_rules_list_all_request(
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/classificationrules')

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_classification_rules_list_versions_by_classification_rule_name_request(
    classification_rule_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/classificationrules/{classificationRuleName}/versions')
    path_format_arguments = {
        "classificationRuleName": _SERIALIZER.url("classification_rule_name", classification_rule_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_classification_rules_tag_classification_version_request(
    classification_rule_name,  # type: str
    classification_rule_version,  # type: int
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    action = kwargs.pop('action')  # type: str

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/classificationrules/{classificationRuleName}/versions/{classificationRuleVersion}/:tag')
    path_format_arguments = {
        "classificationRuleName": _SERIALIZER.url("classification_rule_name", classification_rule_name, 'str'),
        "classificationRuleVersion": _SERIALIZER.url("classification_rule_version", classification_rule_version, 'int'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['action'] = _SERIALIZER.query("action", action, 'str')
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="POST",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_data_sources_create_or_update_request(
    data_source_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    content_type = kwargs.pop('content_type', None)  # type: Optional[str]

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    if content_type is not None:
        header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="PUT",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_data_sources_get_request(
    data_source_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_data_sources_delete_request(
    data_source_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="DELETE",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_data_sources_list_all_request(
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources')

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_filters_get_request(
    data_source_name,  # type: str
    scan_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/filters/custom')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
        "scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_filters_create_or_update_request(
    data_source_name,  # type: str
    scan_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    content_type = kwargs.pop('content_type', None)  # type: Optional[str]

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/filters/custom')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
        "scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    if content_type is not None:
        header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="PUT",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_scans_create_or_update_request(
    data_source_name,  # type: str
    scan_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    content_type = kwargs.pop('content_type', None)  # type: Optional[str]

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
        "scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    if content_type is not None:
        header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="PUT",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_scans_get_request(
    data_source_name,  # type: str
    scan_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
        "scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_scans_delete_request(
    data_source_name,  # type: str
    scan_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
        "scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="DELETE",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_scans_list_by_data_source_request(
    data_source_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_scan_result_run_scan_request(
    data_source_name,  # type: str
    scan_name,  # type: str
    run_id,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    scan_level = kwargs.pop('scan_level', None)  # type: Optional[str]

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
        "scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
        "runId": _SERIALIZER.url("run_id", run_id, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    if scan_level is not None:
        query_parameters['scanLevel'] = _SERIALIZER.query("scan_level", scan_level, 'str')
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="PUT",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_scan_result_cancel_scan_request(
    data_source_name,  # type: str
    scan_name,  # type: str
    run_id,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}/:cancel')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
        "scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
        "runId": _SERIALIZER.url("run_id", run_id, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="POST",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_scan_result_list_scan_history_request(
    data_source_name,  # type: str
    scan_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/runs')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
        "scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_scan_rulesets_get_request(
    scan_ruleset_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/scanrulesets/{scanRulesetName}')
    path_format_arguments = {
        "scanRulesetName": _SERIALIZER.url("scan_ruleset_name", scan_ruleset_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_scan_rulesets_create_or_update_request(
    scan_ruleset_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    content_type = kwargs.pop('content_type', None)  # type: Optional[str]

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/scanrulesets/{scanRulesetName}')
    path_format_arguments = {
        "scanRulesetName": _SERIALIZER.url("scan_ruleset_name", scan_ruleset_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    if content_type is not None:
        header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="PUT",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_scan_rulesets_delete_request(
    scan_ruleset_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/scanrulesets/{scanRulesetName}')
    path_format_arguments = {
        "scanRulesetName": _SERIALIZER.url("scan_ruleset_name", scan_ruleset_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="DELETE",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_scan_rulesets_list_all_request(
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/scanrulesets')

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_system_scan_rulesets_list_all_request(
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/systemScanRulesets')

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_system_scan_rulesets_get_request(
    data_source_type,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/systemScanRulesets/datasources/{dataSourceType}')
    path_format_arguments = {
        "dataSourceType": _SERIALIZER.url("data_source_type", data_source_type, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_system_scan_rulesets_get_by_version_request(
    version,  # type: int
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    data_source_type = kwargs.pop('data_source_type', None)  # type: Optional[str]

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/systemScanRulesets/versions/{version}')
    path_format_arguments = {
        "version": _SERIALIZER.url("version", version, 'int'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    if data_source_type is not None:
        query_parameters['dataSourceType'] = _SERIALIZER.query("data_source_type", data_source_type, 'str')
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_system_scan_rulesets_get_latest_request(
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    data_source_type = kwargs.pop('data_source_type', None)  # type: Optional[str]

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/systemScanRulesets/versions/latest')

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    if data_source_type is not None:
        query_parameters['dataSourceType'] = _SERIALIZER.query("data_source_type", data_source_type, 'str')
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_system_scan_rulesets_list_versions_by_data_source_request(
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    data_source_type = kwargs.pop('data_source_type', None)  # type: Optional[str]

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/systemScanRulesets/versions')

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    if data_source_type is not None:
        query_parameters['dataSourceType'] = _SERIALIZER.query("data_source_type", data_source_type, 'str')
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_triggers_get_trigger_request(
    data_source_name,  # type: str
    scan_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/triggers/default')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
        "scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_triggers_create_trigger_request(
    data_source_name,  # type: str
    scan_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    content_type = kwargs.pop('content_type', None)  # type: Optional[str]

    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/triggers/default')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
        "scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    if content_type is not None:
        header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="PUT",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )


def build_triggers_delete_trigger_request(
    data_source_name,  # type: str
    scan_name,  # type: str
    **kwargs  # type: Any
):
    # type: (...) -> HttpRequest
    api_version = "2018-12-01-preview"
    accept = "application/json"
    # Construct URL
    url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/triggers/default')
    path_format_arguments = {
        "dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
        "scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
    }

    url = _format_url_section(url, **path_format_arguments)

    # Construct parameters
    query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="DELETE",
        url=url,
        params=query_parameters,
        headers=header_parameters,
        **kwargs
    )

# fmt: on
[docs]class KeyVaultConnectionsOperations(object): """KeyVaultConnectionsOperations operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer): self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace def get( self, key_vault_name, # type: str **kwargs # type: Any ): # type: (...) -> Any """Gets key vault information. :param key_vault_name: :type key_vault_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "baseUrl": "str", # Optional. "description": "str" # Optional. } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_key_vault_connections_get_request( key_vault_name=key_vault_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace def create( self, key_vault_name, # type: str body, # type: Any **kwargs # type: Any ): # type: (...) -> Any """Creates an instance of a key vault connection. :param key_vault_name: :type key_vault_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "properties": { "baseUrl": "str", # Optional. "description": "str" # Optional. } } # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "baseUrl": "str", # Optional. "description": "str" # Optional. } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] json = body request = build_key_vault_connections_create_request( key_vault_name=key_vault_name, content_type=content_type, json=json, template_url=self.create.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace def delete( self, key_vault_name, # type: str **kwargs # type: Any ): # type: (...) -> Optional[Any] """Deletes the key vault connection associated with the account. :param key_vault_name: :type key_vault_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "baseUrl": "str", # Optional. "description": "str" # Optional. } } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_key_vault_connections_delete_request( key_vault_name=key_vault_name, template_url=self.delete.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace def list_all( self, **kwargs # type: Any ): # type: (...) -> Iterable[Any] """List key vault connections in account. :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "properties": { "baseUrl": "str", # Optional. "description": "str" # Optional. } } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_key_vault_connections_list_all_request( template_url=self.list_all.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_key_vault_connections_list_all_request( template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged( get_next, extract_data )
list_all.metadata = {'url': '/azureKeyVaults'} # type: ignore
[docs]class ClassificationRulesOperations(object): """ClassificationRulesOperations operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer): self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace def get( self, classification_rule_name, # type: str **kwargs # type: Any ): # type: (...) -> Any """Get a classification rule. :param classification_rule_name: :type classification_rule_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_classification_rules_get_request( classification_rule_name=classification_rule_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace def create_or_update( self, classification_rule_name, # type: str body=None, # type: Any **kwargs # type: Any ): # type: (...) -> Any """Creates or Updates a classification rule. :param classification_rule_name: :type classification_rule_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python kind = 'CustomClassificationRule' or 'SystemClassificationRule' # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] if body is not None: json = body else: json = None request = build_classification_rules_create_or_update_request( classification_rule_name=classification_rule_name, content_type=content_type, json=json, template_url=self.create_or_update.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_or_update.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace def delete( self, classification_rule_name, # type: str **kwargs # type: Any ): # type: (...) -> Optional[Any] """Deletes a classification rule. :param classification_rule_name: :type classification_rule_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_classification_rules_delete_request( classification_rule_name=classification_rule_name, template_url=self.delete.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace def list_all( self, **kwargs # type: Any ): # type: (...) -> Iterable[Any] """List classification rules in Account. :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_classification_rules_list_all_request( template_url=self.list_all.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_classification_rules_list_all_request( template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged( get_next, extract_data )
list_all.metadata = {'url': '/classificationrules'} # type: ignore
[docs] @distributed_trace def list_versions_by_classification_rule_name( self, classification_rule_name, # type: str **kwargs # type: Any ): # type: (...) -> Iterable[Any] """Lists the rule versions of a classification rule. :param classification_rule_name: :type classification_rule_name: str :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_classification_rules_list_versions_by_classification_rule_name_request( classification_rule_name=classification_rule_name, template_url=self.list_versions_by_classification_rule_name.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_classification_rules_list_versions_by_classification_rule_name_request( classification_rule_name=classification_rule_name, template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged( get_next, extract_data )
list_versions_by_classification_rule_name.metadata = {'url': '/classificationrules/{classificationRuleName}/versions'} # type: ignore
[docs] @distributed_trace def tag_classification_version( self, classification_rule_name, # type: str classification_rule_version, # type: int **kwargs # type: Any ): # type: (...) -> Any """Sets Classification Action on a specific classification rule version. :param classification_rule_name: :type classification_rule_name: str :param classification_rule_version: :type classification_rule_version: int :keyword action: Possible values are: "Keep" or "Delete". :paramtype action: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 202 response.json() == { "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "scanResultId": str, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled". } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) action = kwargs.pop('action') # type: str request = build_classification_rules_tag_classification_version_request( classification_rule_name=classification_rule_name, classification_rule_version=classification_rule_version, action=action, template_url=self.tag_classification_version.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
tag_classification_version.metadata = {'url': '/classificationrules/{classificationRuleName}/versions/{classificationRuleVersion}/:tag'} # type: ignore
[docs]class DataSourcesOperations(object): """DataSourcesOperations operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer): self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace def create_or_update( self, data_source_name, # type: str body=None, # type: Any **kwargs # type: Any ): # type: (...) -> Any """Creates or Updates a data source. :param data_source_name: :type data_source_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python kind = 'AdlsGen1DataSource' or 'AdlsGen2DataSource' or 'AmazonAccountDataSource' or 'AmazonPostgreSqlDataSource' or 'AmazonS3DataSource' or 'AmazonSqlDataSource' or 'AzureCosmosDbDataSource' or 'AzureDataExplorerDataSource' or 'AzureFileServiceDataSource' or 'AzureMySqlDataSource' or 'AzurePostgreSqlDataSource' or 'AzureResourceGroupDataSource' or 'AzureSqlDataWarehouseDataSource' or 'AzureSqlDatabaseDataSource' or 'AzureSqlDatabaseManagedInstanceDataSource' or 'AzureStorageDataSource' or 'AzureSubscriptionDataSource' or 'AzureSynapseDataSource' or 'AzureSynapseWorkspaceDataSource' or 'OracleDataSource' or 'PowerBIDataSource' or 'SapEccDataSource' or 'SapS4HanaDataSource' or 'SqlServerDatabaseDataSource' or 'TeradataDataSource' # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "scans": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ], kind: DataSource } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scans": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ], kind: DataSource } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] if body is not None: json = body else: json = None request = build_data_sources_create_or_update_request( data_source_name=data_source_name, content_type=content_type, json=json, template_url=self.create_or_update.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace def get( self, data_source_name, # type: str **kwargs # type: Any ): # type: (...) -> Any """Get a data source. :param data_source_name: :type data_source_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scans": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ], kind: DataSource } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_data_sources_get_request( data_source_name=data_source_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace def delete( self, data_source_name, # type: str **kwargs # type: Any ): # type: (...) -> Optional[Any] """Deletes a data source. :param data_source_name: :type data_source_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scans": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ], kind: DataSource } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_data_sources_delete_request( data_source_name=data_source_name, template_url=self.delete.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace def list_all( self, **kwargs # type: Any ): # type: (...) -> Iterable[Any] """List data sources in Data catalog. :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "scans": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ], kind: DataSource } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_data_sources_list_all_request( template_url=self.list_all.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_data_sources_list_all_request( template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged( get_next, extract_data )
list_all.metadata = {'url': '/datasources'} # type: ignore
[docs]class FiltersOperations(object): """FiltersOperations operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer): self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace def get( self, data_source_name, # type: str scan_name, # type: str **kwargs # type: Any ): # type: (...) -> Any """Get a filter. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "excludeUriPrefixes": [ "str" # Optional. ], "includeUriPrefixes": [ "str" # Optional. ] } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_filters_get_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/filters/custom'} # type: ignore
[docs] @distributed_trace def create_or_update( self, data_source_name, # type: str scan_name, # type: str body=None, # type: Any **kwargs # type: Any ): # type: (...) -> Any """Creates or updates a filter. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "properties": { "excludeUriPrefixes": [ "str" # Optional. ], "includeUriPrefixes": [ "str" # Optional. ] } } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "excludeUriPrefixes": [ "str" # Optional. ], "includeUriPrefixes": [ "str" # Optional. ] } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] if body is not None: json = body else: json = None request = build_filters_create_or_update_request( data_source_name=data_source_name, scan_name=scan_name, content_type=content_type, json=json, template_url=self.create_or_update.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/filters/custom'} # type: ignore
[docs]class ScansOperations(object): """ScansOperations operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer): self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace def create_or_update( self, data_source_name, # type: str scan_name, # type: str body, # type: Any **kwargs # type: Any ): # type: (...) -> Any """Creates an instance of a scan. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python kind = 'AdlsGen1CredentialScan' or 'AdlsGen1MsiScan' or 'AdlsGen2CredentialScan' or 'AdlsGen2MsiScan' or 'AmazonAccountCredentialScan' or 'AmazonPostgreSqlCredentialScan' or 'AmazonS3CredentialScan' or 'AmazonS3RoleARNScan' or 'AmazonSqlCredentialScan' or 'AzureCosmosDbCredentialScan' or 'AzureDataExplorerCredentialScan' or 'AzureDataExplorerMsiScan' or 'AzureFileServiceCredentialScan' or 'AzureMySqlCredentialScan' or 'AzurePostgreSqlCredentialScan' or 'AzureResourceGroupCredentialScan' or 'AzureResourceGroupMsiScan' or 'AzureSqlDataWarehouseCredentialScan' or 'AzureSqlDataWarehouseMsiScan' or 'AzureSqlDatabaseCredentialScan' or 'AzureSqlDatabaseManagedInstanceCredentialScan' or 'AzureSqlDatabaseManagedInstanceMsiScan' or 'AzureSqlDatabaseMsiScan' or 'AzureStorageCredentialScan' or 'AzureStorageMsiScan' or 'AzureSubscriptionCredentialScan' or 'AzureSubscriptionMsiScan' or 'AzureSynapseCredentialScan' or 'AzureSynapseMsiScan' or 'AzureSynapseWorkspaceCredentialScan' or 'AzureSynapseWorkspaceMsiScan' or 'OracleCredentialScan' or 'OracleUserPassScan' or 'PowerBIDelegatedScan' or 'PowerBIMsiScan' or 'SapEccCredentialScan' or 'SapEccUserPassScan' or 'SapS4HanaSapS4HanaCredentialScan' or 'SapS4HanaSapS4HanaUserPassScan' or 'SqlServerDatabaseCredentialScan' or 'TeradataCredentialScan' or 'TeradataUserPassScanAutoGenerated' or 'TeradataUserPassScan' # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] json = body request = build_scans_create_or_update_request( data_source_name=data_source_name, scan_name=scan_name, content_type=content_type, json=json, template_url=self.create_or_update.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace def get( self, data_source_name, # type: str scan_name, # type: str **kwargs # type: Any ): # type: (...) -> Any """Gets a scan information. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_scans_get_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace def delete( self, data_source_name, # type: str scan_name, # type: str **kwargs # type: Any ): # type: (...) -> Optional[Any] """Deletes the scan associated with the data source. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_scans_delete_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.delete.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace def list_by_data_source( self, data_source_name, # type: str **kwargs # type: Any ): # type: (...) -> Iterable[Any] """List scans in data source. :param data_source_name: :type data_source_name: str :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_scans_list_by_data_source_request( data_source_name=data_source_name, template_url=self.list_by_data_source.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_scans_list_by_data_source_request( data_source_name=data_source_name, template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged( get_next, extract_data )
list_by_data_source.metadata = {'url': '/datasources/{dataSourceName}/scans'} # type: ignore
[docs]class ScanResultOperations(object): """ScanResultOperations operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer): self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace def run_scan( self, data_source_name, # type: str scan_name, # type: str run_id, # type: str **kwargs # type: Any ): # type: (...) -> Any """Runs the scan. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :param run_id: :type run_id: str :keyword scan_level: Possible values are: "Full" or "Incremental". :paramtype scan_level: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 202 response.json() == { "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "scanResultId": str, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled". } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) scan_level = kwargs.pop('scan_level', None) # type: Optional[str] request = build_scan_result_run_scan_request( data_source_name=data_source_name, scan_name=scan_name, run_id=run_id, scan_level=scan_level, template_url=self.run_scan.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
run_scan.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}'} # type: ignore
[docs] @distributed_trace def cancel_scan( self, data_source_name, # type: str scan_name, # type: str run_id, # type: str **kwargs # type: Any ): # type: (...) -> Any """Cancels a scan. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :param run_id: :type run_id: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 202 response.json() == { "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "scanResultId": str, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled". } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_scan_result_cancel_scan_request( data_source_name=data_source_name, scan_name=scan_name, run_id=run_id, template_url=self.cancel_scan.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
cancel_scan.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}/:cancel'} # type: ignore
[docs] @distributed_trace def list_scan_history( self, data_source_name, # type: str scan_name, # type: str **kwargs # type: Any ): # type: (...) -> Iterable[Any] """Lists the scan history of a scan. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_scan_result_list_scan_history_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.list_scan_history.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_scan_result_list_scan_history_request( data_source_name=data_source_name, scan_name=scan_name, template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged( get_next, extract_data )
list_scan_history.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs'} # type: ignore
[docs]class ScanRulesetsOperations(object): """ScanRulesetsOperations operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer): self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace def get( self, scan_ruleset_name, # type: str **kwargs # type: Any ): # type: (...) -> Any """Get a scan ruleset. :param scan_ruleset_name: :type scan_ruleset_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: ScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_scan_rulesets_get_request( scan_ruleset_name=scan_ruleset_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace def create_or_update( self, scan_ruleset_name, # type: str body=None, # type: Any **kwargs # type: Any ): # type: (...) -> Any """Creates or Updates a scan ruleset. :param scan_ruleset_name: :type scan_ruleset_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python kind = 'AdlsGen1ScanRuleset' or 'AdlsGen2ScanRuleset' or 'AmazonAccountScanRuleset' or 'AmazonPostgreSqlScanRuleset' or 'AmazonS3ScanRuleset' or 'AmazonSqlScanRuleset' or 'AzureCosmosDbScanRuleset' or 'AzureDataExplorerScanRuleset' or 'AzureFileServiceScanRuleset' or 'AzureMySqlScanRuleset' or 'AzurePostgreSqlScanRuleset' or 'AzureResourceGroupScanRuleset' or 'AzureSqlDataWarehouseScanRuleset' or 'AzureSqlDatabaseScanRuleset' or 'AzureSqlDatabaseManagedInstanceScanRuleset' or 'AzureStorageScanRuleset' or 'AzureSubscriptionScanRuleset' or 'AzureSynapseScanRuleset' or 'AzureSynapseWorkspaceScanRuleset' or 'OracleScanRuleset' or 'PowerBIScanRuleset' or 'SapEccScanRuleset' or 'SapS4HanaScanRuleset' or 'SqlServerDatabaseScanRuleset' or 'TeradataScanRuleset' # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: ScanRuleset } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: ScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] if body is not None: json = body else: json = None request = build_scan_rulesets_create_or_update_request( scan_ruleset_name=scan_ruleset_name, content_type=content_type, json=json, template_url=self.create_or_update.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_or_update.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace def delete( self, scan_ruleset_name, # type: str **kwargs # type: Any ): # type: (...) -> Optional[Any] """Deletes a scan ruleset. :param scan_ruleset_name: :type scan_ruleset_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: ScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_scan_rulesets_delete_request( scan_ruleset_name=scan_ruleset_name, template_url=self.delete.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace def list_all( self, **kwargs # type: Any ): # type: (...) -> Iterable[Any] """List scan rulesets in Data catalog. :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: ScanRuleset } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_scan_rulesets_list_all_request( template_url=self.list_all.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_scan_rulesets_list_all_request( template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged( get_next, extract_data )
list_all.metadata = {'url': '/scanrulesets'} # type: ignore
[docs]class SystemScanRulesetsOperations(object): """SystemScanRulesetsOperations operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer): self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace def list_all( self, **kwargs # type: Any ): # type: (...) -> Iterable[Any] """List all system scan rulesets for an account. :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: SystemScanRuleset } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_system_scan_rulesets_list_all_request( template_url=self.list_all.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_system_scan_rulesets_list_all_request( template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged( get_next, extract_data )
list_all.metadata = {'url': '/systemScanRulesets'} # type: ignore
[docs] @distributed_trace def get( self, data_source_type, # type: str **kwargs # type: Any ): # type: (...) -> Any """Get a system scan ruleset for a data source. :param data_source_type: Possible values are: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI". :type data_source_type: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: SystemScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_system_scan_rulesets_get_request( data_source_type=data_source_type, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/systemScanRulesets/datasources/{dataSourceType}'} # type: ignore
[docs] @distributed_trace def get_by_version( self, version, # type: int **kwargs # type: Any ): # type: (...) -> Any """Get a scan ruleset by version. :param version: :type version: int :keyword data_source_type: Possible values are: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI". :paramtype data_source_type: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: SystemScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) data_source_type = kwargs.pop('data_source_type', None) # type: Optional[str] request = build_system_scan_rulesets_get_by_version_request( version=version, data_source_type=data_source_type, template_url=self.get_by_version.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get_by_version.metadata = {'url': '/systemScanRulesets/versions/{version}'} # type: ignore
[docs] @distributed_trace def get_latest( self, **kwargs # type: Any ): # type: (...) -> Any """Get the latest version of a system scan ruleset. :keyword data_source_type: Possible values are: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI". :paramtype data_source_type: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: SystemScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) data_source_type = kwargs.pop('data_source_type', None) # type: Optional[str] request = build_system_scan_rulesets_get_latest_request( data_source_type=data_source_type, template_url=self.get_latest.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get_latest.metadata = {'url': '/systemScanRulesets/versions/latest'} # type: ignore
[docs] @distributed_trace def list_versions_by_data_source( self, **kwargs # type: Any ): # type: (...) -> Iterable[Any] """List system scan ruleset versions in Data catalog. :keyword data_source_type: Possible values are: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI". :paramtype data_source_type: str :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: SystemScanRuleset } ] } """ data_source_type = kwargs.pop('data_source_type', None) # type: Optional[str] cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_system_scan_rulesets_list_versions_by_data_source_request( data_source_type=data_source_type, template_url=self.list_versions_by_data_source.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_system_scan_rulesets_list_versions_by_data_source_request( data_source_type=data_source_type, template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged( get_next, extract_data )
list_versions_by_data_source.metadata = {'url': '/systemScanRulesets/versions'} # type: ignore
[docs]class TriggersOperations(object): """TriggersOperations operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer): self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace def get_trigger( self, data_source_name, # type: str scan_name, # type: str **kwargs # type: Any ): # type: (...) -> Any """Gets trigger information. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "createdAt": "2020-02-20 00:00:00", # Optional. "incrementalScanStartTime": "2020-02-20 00:00:00", # Optional. "lastModifiedAt": "2020-02-20 00:00:00", # Optional. "lastScheduled": "2020-02-20 00:00:00", # Optional. "recurrence": { "endTime": "2020-02-20 00:00:00", # Optional. "frequency": "str", # Optional. Possible values include: "Week", "Month". "interval": 0, # Optional. "schedule": { "hours": [ 0 # Optional. ], "minutes": [ 0 # Optional. ], "monthDays": [ 0 # Optional. ], "monthlyOccurrences": [ { "day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday". "occurrence": 0 # Optional. } ], "weekDays": [ "str" # Optional. ] }, "startTime": "2020-02-20 00:00:00", # Optional. "timeZone": "str" # Optional. }, "recurrenceInterval": "str", # Optional. "scanLevel": "str" # Optional. Possible values include: "Full", "Incremental". } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_triggers_get_trigger_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.get_trigger.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore
[docs] @distributed_trace def create_trigger( self, data_source_name, # type: str scan_name, # type: str body, # type: Any **kwargs # type: Any ): # type: (...) -> Any """Creates an instance of a trigger. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "properties": { "createdAt": "2020-02-20 00:00:00", # Optional. "incrementalScanStartTime": "2020-02-20 00:00:00", # Optional. "lastModifiedAt": "2020-02-20 00:00:00", # Optional. "lastScheduled": "2020-02-20 00:00:00", # Optional. "recurrence": { "endTime": "2020-02-20 00:00:00", # Optional. "frequency": "str", # Optional. Possible values include: "Week", "Month". "interval": 0, # Optional. "schedule": { "hours": [ 0 # Optional. ], "minutes": [ 0 # Optional. ], "monthDays": [ 0 # Optional. ], "monthlyOccurrences": [ { "day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday". "occurrence": 0 # Optional. } ], "weekDays": [ "str" # Optional. ] }, "startTime": "2020-02-20 00:00:00", # Optional. "timeZone": "str" # Optional. }, "recurrenceInterval": "str", # Optional. "scanLevel": "str" # Optional. Possible values include: "Full", "Incremental". } } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "createdAt": "2020-02-20 00:00:00", # Optional. "incrementalScanStartTime": "2020-02-20 00:00:00", # Optional. "lastModifiedAt": "2020-02-20 00:00:00", # Optional. "lastScheduled": "2020-02-20 00:00:00", # Optional. "recurrence": { "endTime": "2020-02-20 00:00:00", # Optional. "frequency": "str", # Optional. Possible values include: "Week", "Month". "interval": 0, # Optional. "schedule": { "hours": [ 0 # Optional. ], "minutes": [ 0 # Optional. ], "monthDays": [ 0 # Optional. ], "monthlyOccurrences": [ { "day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday". "occurrence": 0 # Optional. } ], "weekDays": [ "str" # Optional. ] }, "startTime": "2020-02-20 00:00:00", # Optional. "timeZone": "str" # Optional. }, "recurrenceInterval": "str", # Optional. "scanLevel": "str" # Optional. Possible values include: "Full", "Incremental". } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] json = body request = build_triggers_create_trigger_request( data_source_name=data_source_name, scan_name=scan_name, content_type=content_type, json=json, template_url=self.create_trigger.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore
[docs] @distributed_trace def delete_trigger( self, data_source_name, # type: str scan_name, # type: str **kwargs # type: Any ): # type: (...) -> Optional[Any] """Deletes the trigger associated with the scan. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "createdAt": "2020-02-20 00:00:00", # Optional. "incrementalScanStartTime": "2020-02-20 00:00:00", # Optional. "lastModifiedAt": "2020-02-20 00:00:00", # Optional. "lastScheduled": "2020-02-20 00:00:00", # Optional. "recurrence": { "endTime": "2020-02-20 00:00:00", # Optional. "frequency": "str", # Optional. Possible values include: "Week", "Month". "interval": 0, # Optional. "schedule": { "hours": [ 0 # Optional. ], "minutes": [ 0 # Optional. ], "monthDays": [ 0 # Optional. ], "monthlyOccurrences": [ { "day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday". "occurrence": 0 # Optional. } ], "weekDays": [ "str" # Optional. ] }, "startTime": "2020-02-20 00:00:00", # Optional. "timeZone": "str" # Optional. }, "recurrenceInterval": "str", # Optional. "scanLevel": "str" # Optional. Possible values include: "Full", "Incremental". } } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_triggers_delete_trigger_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.delete_trigger.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore