# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import functools
from json import loads as _loads
from typing import TYPE_CHECKING
import warnings
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import HttpResponse
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from msrest import Serializer
from .._vendor import _convert_request, _format_url_section
if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
from typing import Any, Callable, Dict, Generic, Iterable, Optional, TypeVar, Union
T = TypeVar('T')
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
_SERIALIZER = Serializer()
# fmt: off
def build_key_vault_connections_get_request(
key_vault_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/azureKeyVaults/{keyVaultName}')
path_format_arguments = {
"keyVaultName": _SERIALIZER.url("key_vault_name", key_vault_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_key_vault_connections_create_request(
key_vault_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
content_type = kwargs.pop('content_type', None) # type: Optional[str]
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/azureKeyVaults/{keyVaultName}')
path_format_arguments = {
"keyVaultName": _SERIALIZER.url("key_vault_name", key_vault_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
if content_type is not None:
header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="PUT",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_key_vault_connections_delete_request(
key_vault_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/azureKeyVaults/{keyVaultName}')
path_format_arguments = {
"keyVaultName": _SERIALIZER.url("key_vault_name", key_vault_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="DELETE",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_key_vault_connections_list_all_request(
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/azureKeyVaults')
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_classification_rules_get_request(
classification_rule_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/classificationrules/{classificationRuleName}')
path_format_arguments = {
"classificationRuleName": _SERIALIZER.url("classification_rule_name", classification_rule_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_classification_rules_create_or_update_request(
classification_rule_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
content_type = kwargs.pop('content_type', None) # type: Optional[str]
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/classificationrules/{classificationRuleName}')
path_format_arguments = {
"classificationRuleName": _SERIALIZER.url("classification_rule_name", classification_rule_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
if content_type is not None:
header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="PUT",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_classification_rules_delete_request(
classification_rule_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/classificationrules/{classificationRuleName}')
path_format_arguments = {
"classificationRuleName": _SERIALIZER.url("classification_rule_name", classification_rule_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="DELETE",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_classification_rules_list_all_request(
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/classificationrules')
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_classification_rules_list_versions_by_classification_rule_name_request(
classification_rule_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/classificationrules/{classificationRuleName}/versions')
path_format_arguments = {
"classificationRuleName": _SERIALIZER.url("classification_rule_name", classification_rule_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_classification_rules_tag_classification_version_request(
classification_rule_name, # type: str
classification_rule_version, # type: int
**kwargs # type: Any
):
# type: (...) -> HttpRequest
action = kwargs.pop('action') # type: str
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/classificationrules/{classificationRuleName}/versions/{classificationRuleVersion}/:tag')
path_format_arguments = {
"classificationRuleName": _SERIALIZER.url("classification_rule_name", classification_rule_name, 'str'),
"classificationRuleVersion": _SERIALIZER.url("classification_rule_version", classification_rule_version, 'int'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['action'] = _SERIALIZER.query("action", action, 'str')
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="POST",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_data_sources_create_or_update_request(
data_source_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
content_type = kwargs.pop('content_type', None) # type: Optional[str]
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
if content_type is not None:
header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="PUT",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_data_sources_get_request(
data_source_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_data_sources_delete_request(
data_source_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="DELETE",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_data_sources_list_all_request(
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources')
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_filters_get_request(
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/filters/custom')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
"scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_filters_create_or_update_request(
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
content_type = kwargs.pop('content_type', None) # type: Optional[str]
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/filters/custom')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
"scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
if content_type is not None:
header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="PUT",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_scans_create_or_update_request(
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
content_type = kwargs.pop('content_type', None) # type: Optional[str]
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
"scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
if content_type is not None:
header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="PUT",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_scans_get_request(
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
"scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_scans_delete_request(
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
"scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="DELETE",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_scans_list_by_data_source_request(
data_source_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_scan_result_run_scan_request(
data_source_name, # type: str
scan_name, # type: str
run_id, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
scan_level = kwargs.pop('scan_level', None) # type: Optional[str]
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
"scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
"runId": _SERIALIZER.url("run_id", run_id, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
if scan_level is not None:
query_parameters['scanLevel'] = _SERIALIZER.query("scan_level", scan_level, 'str')
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="PUT",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_scan_result_cancel_scan_request(
data_source_name, # type: str
scan_name, # type: str
run_id, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}/:cancel')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
"scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
"runId": _SERIALIZER.url("run_id", run_id, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="POST",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_scan_result_list_scan_history_request(
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/runs')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
"scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_scan_rulesets_get_request(
scan_ruleset_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/scanrulesets/{scanRulesetName}')
path_format_arguments = {
"scanRulesetName": _SERIALIZER.url("scan_ruleset_name", scan_ruleset_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_scan_rulesets_create_or_update_request(
scan_ruleset_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
content_type = kwargs.pop('content_type', None) # type: Optional[str]
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/scanrulesets/{scanRulesetName}')
path_format_arguments = {
"scanRulesetName": _SERIALIZER.url("scan_ruleset_name", scan_ruleset_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
if content_type is not None:
header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="PUT",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_scan_rulesets_delete_request(
scan_ruleset_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/scanrulesets/{scanRulesetName}')
path_format_arguments = {
"scanRulesetName": _SERIALIZER.url("scan_ruleset_name", scan_ruleset_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="DELETE",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_scan_rulesets_list_all_request(
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/scanrulesets')
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_system_scan_rulesets_list_all_request(
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/systemScanRulesets')
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_system_scan_rulesets_get_request(
data_source_type, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/systemScanRulesets/datasources/{dataSourceType}')
path_format_arguments = {
"dataSourceType": _SERIALIZER.url("data_source_type", data_source_type, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_system_scan_rulesets_get_by_version_request(
version, # type: int
**kwargs # type: Any
):
# type: (...) -> HttpRequest
data_source_type = kwargs.pop('data_source_type', None) # type: Optional[str]
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/systemScanRulesets/versions/{version}')
path_format_arguments = {
"version": _SERIALIZER.url("version", version, 'int'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
if data_source_type is not None:
query_parameters['dataSourceType'] = _SERIALIZER.query("data_source_type", data_source_type, 'str')
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_system_scan_rulesets_get_latest_request(
**kwargs # type: Any
):
# type: (...) -> HttpRequest
data_source_type = kwargs.pop('data_source_type', None) # type: Optional[str]
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/systemScanRulesets/versions/latest')
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
if data_source_type is not None:
query_parameters['dataSourceType'] = _SERIALIZER.query("data_source_type", data_source_type, 'str')
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_system_scan_rulesets_list_versions_by_data_source_request(
**kwargs # type: Any
):
# type: (...) -> HttpRequest
data_source_type = kwargs.pop('data_source_type', None) # type: Optional[str]
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/systemScanRulesets/versions')
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
if data_source_type is not None:
query_parameters['dataSourceType'] = _SERIALIZER.query("data_source_type", data_source_type, 'str')
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_triggers_get_trigger_request(
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/triggers/default')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
"scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="GET",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_triggers_create_trigger_request(
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
content_type = kwargs.pop('content_type', None) # type: Optional[str]
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/triggers/default')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
"scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
if content_type is not None:
header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="PUT",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
def build_triggers_delete_trigger_request(
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> HttpRequest
api_version = "2018-12-01-preview"
accept = "application/json"
# Construct URL
url = kwargs.pop("template_url", '/datasources/{dataSourceName}/scans/{scanName}/triggers/default')
path_format_arguments = {
"dataSourceName": _SERIALIZER.url("data_source_name", data_source_name, 'str'),
"scanName": _SERIALIZER.url("scan_name", scan_name, 'str'),
}
url = _format_url_section(url, **path_format_arguments)
# Construct parameters
query_parameters = kwargs.pop("params", {}) # type: Dict[str, Any]
query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')
# Construct headers
header_parameters = kwargs.pop("headers", {}) # type: Dict[str, Any]
header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')
return HttpRequest(
method="DELETE",
url=url,
params=query_parameters,
headers=header_parameters,
**kwargs
)
# fmt: on
[docs]class KeyVaultConnectionsOperations(object):
"""KeyVaultConnectionsOperations operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer):
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace
def get(
self,
key_vault_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Any
"""Gets key vault information.
:param key_vault_name:
:type key_vault_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"baseUrl": "str", # Optional.
"description": "str" # Optional.
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_key_vault_connections_get_request(
key_vault_name=key_vault_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace
def create(
self,
key_vault_name, # type: str
body, # type: Any
**kwargs # type: Any
):
# type: (...) -> Any
"""Creates an instance of a key vault connection.
:param key_vault_name:
:type key_vault_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"baseUrl": "str", # Optional.
"description": "str" # Optional.
}
}
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"baseUrl": "str", # Optional.
"description": "str" # Optional.
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
json = body
request = build_key_vault_connections_create_request(
key_vault_name=key_vault_name,
content_type=content_type,
json=json,
template_url=self.create.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace
def delete(
self,
key_vault_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Optional[Any]
"""Deletes the key vault connection associated with the account.
:param key_vault_name:
:type key_vault_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"baseUrl": "str", # Optional.
"description": "str" # Optional.
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_key_vault_connections_delete_request(
key_vault_name=key_vault_name,
template_url=self.delete.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace
def list_all(
self,
**kwargs # type: Any
):
# type: (...) -> Iterable[Any]
"""List key vault connections in account.
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"baseUrl": "str", # Optional.
"description": "str" # Optional.
}
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_key_vault_connections_list_all_request(
template_url=self.list_all.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_key_vault_connections_list_all_request(
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(
get_next, extract_data
)
list_all.metadata = {'url': '/azureKeyVaults'} # type: ignore
[docs]class ClassificationRulesOperations(object):
"""ClassificationRulesOperations operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer):
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace
def get(
self,
classification_rule_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Any
"""Get a classification rule.
:param classification_rule_name:
:type classification_rule_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_classification_rules_get_request(
classification_rule_name=classification_rule_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace
def create_or_update(
self,
classification_rule_name, # type: str
body=None, # type: Any
**kwargs # type: Any
):
# type: (...) -> Any
"""Creates or Updates a classification rule.
:param classification_rule_name:
:type classification_rule_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
kind = 'CustomClassificationRule' or 'SystemClassificationRule'
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
if body is not None:
json = body
else:
json = None
request = build_classification_rules_create_or_update_request(
classification_rule_name=classification_rule_name,
content_type=content_type,
json=json,
template_url=self.create_or_update.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_or_update.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace
def delete(
self,
classification_rule_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Optional[Any]
"""Deletes a classification rule.
:param classification_rule_name:
:type classification_rule_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_classification_rules_delete_request(
classification_rule_name=classification_rule_name,
template_url=self.delete.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace
def list_all(
self,
**kwargs # type: Any
):
# type: (...) -> Iterable[Any]
"""List classification rules in Account.
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_classification_rules_list_all_request(
template_url=self.list_all.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_classification_rules_list_all_request(
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(
get_next, extract_data
)
list_all.metadata = {'url': '/classificationrules'} # type: ignore
[docs] @distributed_trace
def list_versions_by_classification_rule_name(
self,
classification_rule_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Iterable[Any]
"""Lists the rule versions of a classification rule.
:param classification_rule_name:
:type classification_rule_name: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_classification_rules_list_versions_by_classification_rule_name_request(
classification_rule_name=classification_rule_name,
template_url=self.list_versions_by_classification_rule_name.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_classification_rules_list_versions_by_classification_rule_name_request(
classification_rule_name=classification_rule_name,
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(
get_next, extract_data
)
list_versions_by_classification_rule_name.metadata = {'url': '/classificationrules/{classificationRuleName}/versions'} # type: ignore
[docs] @distributed_trace
def tag_classification_version(
self,
classification_rule_name, # type: str
classification_rule_version, # type: int
**kwargs # type: Any
):
# type: (...) -> Any
"""Sets Classification Action on a specific classification rule version.
:param classification_rule_name:
:type classification_rule_name: str
:param classification_rule_version:
:type classification_rule_version: int
:keyword action: Possible values are: "Keep" or "Delete".
:paramtype action: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 202
response.json() == {
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"scanResultId": str, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled".
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
action = kwargs.pop('action') # type: str
request = build_classification_rules_tag_classification_version_request(
classification_rule_name=classification_rule_name,
classification_rule_version=classification_rule_version,
action=action,
template_url=self.tag_classification_version.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
tag_classification_version.metadata = {'url': '/classificationrules/{classificationRuleName}/versions/{classificationRuleVersion}/:tag'} # type: ignore
[docs]class DataSourcesOperations(object):
"""DataSourcesOperations operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer):
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace
def create_or_update(
self,
data_source_name, # type: str
body=None, # type: Any
**kwargs # type: Any
):
# type: (...) -> Any
"""Creates or Updates a data source.
:param data_source_name:
:type data_source_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
kind = 'AdlsGen1DataSource' or 'AdlsGen2DataSource' or 'AmazonAccountDataSource' or 'AmazonPostgreSqlDataSource' or 'AmazonS3DataSource' or 'AmazonSqlDataSource' or 'AzureCosmosDbDataSource' or 'AzureDataExplorerDataSource' or 'AzureFileServiceDataSource' or 'AzureMySqlDataSource' or 'AzurePostgreSqlDataSource' or 'AzureResourceGroupDataSource' or 'AzureSqlDataWarehouseDataSource' or 'AzureSqlDatabaseDataSource' or 'AzureSqlDatabaseManagedInstanceDataSource' or 'AzureStorageDataSource' or 'AzureSubscriptionDataSource' or 'AzureSynapseDataSource' or 'AzureSynapseWorkspaceDataSource' or 'OracleDataSource' or 'PowerBIDataSource' or 'SapEccDataSource' or 'SapS4HanaDataSource' or 'SqlServerDatabaseDataSource' or 'TeradataDataSource'
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"scans": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
],
kind: DataSource
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scans": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
],
kind: DataSource
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
if body is not None:
json = body
else:
json = None
request = build_data_sources_create_or_update_request(
data_source_name=data_source_name,
content_type=content_type,
json=json,
template_url=self.create_or_update.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace
def get(
self,
data_source_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Any
"""Get a data source.
:param data_source_name:
:type data_source_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scans": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
],
kind: DataSource
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_data_sources_get_request(
data_source_name=data_source_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace
def delete(
self,
data_source_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Optional[Any]
"""Deletes a data source.
:param data_source_name:
:type data_source_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scans": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
],
kind: DataSource
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_data_sources_delete_request(
data_source_name=data_source_name,
template_url=self.delete.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace
def list_all(
self,
**kwargs # type: Any
):
# type: (...) -> Iterable[Any]
"""List data sources in Data catalog.
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scans": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
],
kind: DataSource
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_data_sources_list_all_request(
template_url=self.list_all.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_data_sources_list_all_request(
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(
get_next, extract_data
)
list_all.metadata = {'url': '/datasources'} # type: ignore
[docs]class FiltersOperations(object):
"""FiltersOperations operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer):
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace
def get(
self,
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Any
"""Get a filter.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"excludeUriPrefixes": [
"str" # Optional.
],
"includeUriPrefixes": [
"str" # Optional.
]
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_filters_get_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/filters/custom'} # type: ignore
[docs] @distributed_trace
def create_or_update(
self,
data_source_name, # type: str
scan_name, # type: str
body=None, # type: Any
**kwargs # type: Any
):
# type: (...) -> Any
"""Creates or updates a filter.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"excludeUriPrefixes": [
"str" # Optional.
],
"includeUriPrefixes": [
"str" # Optional.
]
}
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"excludeUriPrefixes": [
"str" # Optional.
],
"includeUriPrefixes": [
"str" # Optional.
]
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
if body is not None:
json = body
else:
json = None
request = build_filters_create_or_update_request(
data_source_name=data_source_name,
scan_name=scan_name,
content_type=content_type,
json=json,
template_url=self.create_or_update.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/filters/custom'} # type: ignore
[docs]class ScansOperations(object):
"""ScansOperations operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer):
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace
def create_or_update(
self,
data_source_name, # type: str
scan_name, # type: str
body, # type: Any
**kwargs # type: Any
):
# type: (...) -> Any
"""Creates an instance of a scan.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
kind = 'AdlsGen1CredentialScan' or 'AdlsGen1MsiScan' or 'AdlsGen2CredentialScan' or 'AdlsGen2MsiScan' or 'AmazonAccountCredentialScan' or 'AmazonPostgreSqlCredentialScan' or 'AmazonS3CredentialScan' or 'AmazonS3RoleARNScan' or 'AmazonSqlCredentialScan' or 'AzureCosmosDbCredentialScan' or 'AzureDataExplorerCredentialScan' or 'AzureDataExplorerMsiScan' or 'AzureFileServiceCredentialScan' or 'AzureMySqlCredentialScan' or 'AzurePostgreSqlCredentialScan' or 'AzureResourceGroupCredentialScan' or 'AzureResourceGroupMsiScan' or 'AzureSqlDataWarehouseCredentialScan' or 'AzureSqlDataWarehouseMsiScan' or 'AzureSqlDatabaseCredentialScan' or 'AzureSqlDatabaseManagedInstanceCredentialScan' or 'AzureSqlDatabaseManagedInstanceMsiScan' or 'AzureSqlDatabaseMsiScan' or 'AzureStorageCredentialScan' or 'AzureStorageMsiScan' or 'AzureSubscriptionCredentialScan' or 'AzureSubscriptionMsiScan' or 'AzureSynapseCredentialScan' or 'AzureSynapseMsiScan' or 'AzureSynapseWorkspaceCredentialScan' or 'AzureSynapseWorkspaceMsiScan' or 'OracleCredentialScan' or 'OracleUserPassScan' or 'PowerBIDelegatedScan' or 'PowerBIMsiScan' or 'SapEccCredentialScan' or 'SapEccUserPassScan' or 'SapS4HanaSapS4HanaCredentialScan' or 'SapS4HanaSapS4HanaUserPassScan' or 'SqlServerDatabaseCredentialScan' or 'TeradataCredentialScan' or 'TeradataUserPassScanAutoGenerated' or 'TeradataUserPassScan'
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
json = body
request = build_scans_create_or_update_request(
data_source_name=data_source_name,
scan_name=scan_name,
content_type=content_type,
json=json,
template_url=self.create_or_update.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace
def get(
self,
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Any
"""Gets a scan information.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_scans_get_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace
def delete(
self,
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Optional[Any]
"""Deletes the scan associated with the data source.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_scans_delete_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.delete.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace
def list_by_data_source(
self,
data_source_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Iterable[Any]
"""List scans in data source.
:param data_source_name:
:type data_source_name: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_scans_list_by_data_source_request(
data_source_name=data_source_name,
template_url=self.list_by_data_source.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_scans_list_by_data_source_request(
data_source_name=data_source_name,
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(
get_next, extract_data
)
list_by_data_source.metadata = {'url': '/datasources/{dataSourceName}/scans'} # type: ignore
[docs]class ScanResultOperations(object):
"""ScanResultOperations operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer):
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace
def run_scan(
self,
data_source_name, # type: str
scan_name, # type: str
run_id, # type: str
**kwargs # type: Any
):
# type: (...) -> Any
"""Runs the scan.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:param run_id:
:type run_id: str
:keyword scan_level: Possible values are: "Full" or "Incremental".
:paramtype scan_level: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 202
response.json() == {
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"scanResultId": str, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled".
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
scan_level = kwargs.pop('scan_level', None) # type: Optional[str]
request = build_scan_result_run_scan_request(
data_source_name=data_source_name,
scan_name=scan_name,
run_id=run_id,
scan_level=scan_level,
template_url=self.run_scan.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
run_scan.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}'} # type: ignore
[docs] @distributed_trace
def cancel_scan(
self,
data_source_name, # type: str
scan_name, # type: str
run_id, # type: str
**kwargs # type: Any
):
# type: (...) -> Any
"""Cancels a scan.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:param run_id:
:type run_id: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 202
response.json() == {
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"scanResultId": str, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled".
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_scan_result_cancel_scan_request(
data_source_name=data_source_name,
scan_name=scan_name,
run_id=run_id,
template_url=self.cancel_scan.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
cancel_scan.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}/:cancel'} # type: ignore
[docs] @distributed_trace
def list_scan_history(
self,
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Iterable[Any]
"""Lists the scan history of a scan.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_scan_result_list_scan_history_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.list_scan_history.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_scan_result_list_scan_history_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(
get_next, extract_data
)
list_scan_history.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs'} # type: ignore
[docs]class ScanRulesetsOperations(object):
"""ScanRulesetsOperations operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer):
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace
def get(
self,
scan_ruleset_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Any
"""Get a scan ruleset.
:param scan_ruleset_name:
:type scan_ruleset_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: ScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_scan_rulesets_get_request(
scan_ruleset_name=scan_ruleset_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace
def create_or_update(
self,
scan_ruleset_name, # type: str
body=None, # type: Any
**kwargs # type: Any
):
# type: (...) -> Any
"""Creates or Updates a scan ruleset.
:param scan_ruleset_name:
:type scan_ruleset_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
kind = 'AdlsGen1ScanRuleset' or 'AdlsGen2ScanRuleset' or 'AmazonAccountScanRuleset' or 'AmazonPostgreSqlScanRuleset' or 'AmazonS3ScanRuleset' or 'AmazonSqlScanRuleset' or 'AzureCosmosDbScanRuleset' or 'AzureDataExplorerScanRuleset' or 'AzureFileServiceScanRuleset' or 'AzureMySqlScanRuleset' or 'AzurePostgreSqlScanRuleset' or 'AzureResourceGroupScanRuleset' or 'AzureSqlDataWarehouseScanRuleset' or 'AzureSqlDatabaseScanRuleset' or 'AzureSqlDatabaseManagedInstanceScanRuleset' or 'AzureStorageScanRuleset' or 'AzureSubscriptionScanRuleset' or 'AzureSynapseScanRuleset' or 'AzureSynapseWorkspaceScanRuleset' or 'OracleScanRuleset' or 'PowerBIScanRuleset' or 'SapEccScanRuleset' or 'SapS4HanaScanRuleset' or 'SqlServerDatabaseScanRuleset' or 'TeradataScanRuleset'
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: ScanRuleset
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: ScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
if body is not None:
json = body
else:
json = None
request = build_scan_rulesets_create_or_update_request(
scan_ruleset_name=scan_ruleset_name,
content_type=content_type,
json=json,
template_url=self.create_or_update.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_or_update.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace
def delete(
self,
scan_ruleset_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Optional[Any]
"""Deletes a scan ruleset.
:param scan_ruleset_name:
:type scan_ruleset_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: ScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_scan_rulesets_delete_request(
scan_ruleset_name=scan_ruleset_name,
template_url=self.delete.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace
def list_all(
self,
**kwargs # type: Any
):
# type: (...) -> Iterable[Any]
"""List scan rulesets in Data catalog.
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: ScanRuleset
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_scan_rulesets_list_all_request(
template_url=self.list_all.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_scan_rulesets_list_all_request(
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(
get_next, extract_data
)
list_all.metadata = {'url': '/scanrulesets'} # type: ignore
[docs]class SystemScanRulesetsOperations(object):
"""SystemScanRulesetsOperations operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer):
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace
def list_all(
self,
**kwargs # type: Any
):
# type: (...) -> Iterable[Any]
"""List all system scan rulesets for an account.
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: SystemScanRuleset
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_system_scan_rulesets_list_all_request(
template_url=self.list_all.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_system_scan_rulesets_list_all_request(
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(
get_next, extract_data
)
list_all.metadata = {'url': '/systemScanRulesets'} # type: ignore
[docs] @distributed_trace
def get(
self,
data_source_type, # type: str
**kwargs # type: Any
):
# type: (...) -> Any
"""Get a system scan ruleset for a data source.
:param data_source_type: Possible values are: "None", "AzureSubscription",
"AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2",
"AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer",
"AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql",
"SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql",
"AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI".
:type data_source_type: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: SystemScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_system_scan_rulesets_get_request(
data_source_type=data_source_type,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/systemScanRulesets/datasources/{dataSourceType}'} # type: ignore
[docs] @distributed_trace
def get_by_version(
self,
version, # type: int
**kwargs # type: Any
):
# type: (...) -> Any
"""Get a scan ruleset by version.
:param version:
:type version: int
:keyword data_source_type: Possible values are: "None", "AzureSubscription",
"AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2",
"AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer",
"AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql",
"SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql",
"AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI".
:paramtype data_source_type: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: SystemScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
data_source_type = kwargs.pop('data_source_type', None) # type: Optional[str]
request = build_system_scan_rulesets_get_by_version_request(
version=version,
data_source_type=data_source_type,
template_url=self.get_by_version.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get_by_version.metadata = {'url': '/systemScanRulesets/versions/{version}'} # type: ignore
[docs] @distributed_trace
def get_latest(
self,
**kwargs # type: Any
):
# type: (...) -> Any
"""Get the latest version of a system scan ruleset.
:keyword data_source_type: Possible values are: "None", "AzureSubscription",
"AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2",
"AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer",
"AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql",
"SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql",
"AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI".
:paramtype data_source_type: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: SystemScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
data_source_type = kwargs.pop('data_source_type', None) # type: Optional[str]
request = build_system_scan_rulesets_get_latest_request(
data_source_type=data_source_type,
template_url=self.get_latest.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get_latest.metadata = {'url': '/systemScanRulesets/versions/latest'} # type: ignore
[docs] @distributed_trace
def list_versions_by_data_source(
self,
**kwargs # type: Any
):
# type: (...) -> Iterable[Any]
"""List system scan ruleset versions in Data catalog.
:keyword data_source_type: Possible values are: "None", "AzureSubscription",
"AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2",
"AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer",
"AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql",
"SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql",
"AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI".
:paramtype data_source_type: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: SystemScanRuleset
}
]
}
"""
data_source_type = kwargs.pop('data_source_type', None) # type: Optional[str]
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_system_scan_rulesets_list_versions_by_data_source_request(
data_source_type=data_source_type,
template_url=self.list_versions_by_data_source.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_system_scan_rulesets_list_versions_by_data_source_request(
data_source_type=data_source_type,
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(
get_next, extract_data
)
list_versions_by_data_source.metadata = {'url': '/systemScanRulesets/versions'} # type: ignore
[docs]class TriggersOperations(object):
"""TriggersOperations operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer):
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace
def get_trigger(
self,
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Any
"""Gets trigger information.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"createdAt": "2020-02-20 00:00:00", # Optional.
"incrementalScanStartTime": "2020-02-20 00:00:00", # Optional.
"lastModifiedAt": "2020-02-20 00:00:00", # Optional.
"lastScheduled": "2020-02-20 00:00:00", # Optional.
"recurrence": {
"endTime": "2020-02-20 00:00:00", # Optional.
"frequency": "str", # Optional. Possible values include: "Week", "Month".
"interval": 0, # Optional.
"schedule": {
"hours": [
0 # Optional.
],
"minutes": [
0 # Optional.
],
"monthDays": [
0 # Optional.
],
"monthlyOccurrences": [
{
"day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday".
"occurrence": 0 # Optional.
}
],
"weekDays": [
"str" # Optional.
]
},
"startTime": "2020-02-20 00:00:00", # Optional.
"timeZone": "str" # Optional.
},
"recurrenceInterval": "str", # Optional.
"scanLevel": "str" # Optional. Possible values include: "Full", "Incremental".
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_triggers_get_trigger_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.get_trigger.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore
[docs] @distributed_trace
def create_trigger(
self,
data_source_name, # type: str
scan_name, # type: str
body, # type: Any
**kwargs # type: Any
):
# type: (...) -> Any
"""Creates an instance of a trigger.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"createdAt": "2020-02-20 00:00:00", # Optional.
"incrementalScanStartTime": "2020-02-20 00:00:00", # Optional.
"lastModifiedAt": "2020-02-20 00:00:00", # Optional.
"lastScheduled": "2020-02-20 00:00:00", # Optional.
"recurrence": {
"endTime": "2020-02-20 00:00:00", # Optional.
"frequency": "str", # Optional. Possible values include: "Week", "Month".
"interval": 0, # Optional.
"schedule": {
"hours": [
0 # Optional.
],
"minutes": [
0 # Optional.
],
"monthDays": [
0 # Optional.
],
"monthlyOccurrences": [
{
"day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday".
"occurrence": 0 # Optional.
}
],
"weekDays": [
"str" # Optional.
]
},
"startTime": "2020-02-20 00:00:00", # Optional.
"timeZone": "str" # Optional.
},
"recurrenceInterval": "str", # Optional.
"scanLevel": "str" # Optional. Possible values include: "Full", "Incremental".
}
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"createdAt": "2020-02-20 00:00:00", # Optional.
"incrementalScanStartTime": "2020-02-20 00:00:00", # Optional.
"lastModifiedAt": "2020-02-20 00:00:00", # Optional.
"lastScheduled": "2020-02-20 00:00:00", # Optional.
"recurrence": {
"endTime": "2020-02-20 00:00:00", # Optional.
"frequency": "str", # Optional. Possible values include: "Week", "Month".
"interval": 0, # Optional.
"schedule": {
"hours": [
0 # Optional.
],
"minutes": [
0 # Optional.
],
"monthDays": [
0 # Optional.
],
"monthlyOccurrences": [
{
"day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday".
"occurrence": 0 # Optional.
}
],
"weekDays": [
"str" # Optional.
]
},
"startTime": "2020-02-20 00:00:00", # Optional.
"timeZone": "str" # Optional.
},
"recurrenceInterval": "str", # Optional.
"scanLevel": "str" # Optional. Possible values include: "Full", "Incremental".
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
json = body
request = build_triggers_create_trigger_request(
data_source_name=data_source_name,
scan_name=scan_name,
content_type=content_type,
json=json,
template_url=self.create_trigger.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore
[docs] @distributed_trace
def delete_trigger(
self,
data_source_name, # type: str
scan_name, # type: str
**kwargs # type: Any
):
# type: (...) -> Optional[Any]
"""Deletes the trigger associated with the scan.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"createdAt": "2020-02-20 00:00:00", # Optional.
"incrementalScanStartTime": "2020-02-20 00:00:00", # Optional.
"lastModifiedAt": "2020-02-20 00:00:00", # Optional.
"lastScheduled": "2020-02-20 00:00:00", # Optional.
"recurrence": {
"endTime": "2020-02-20 00:00:00", # Optional.
"frequency": "str", # Optional. Possible values include: "Week", "Month".
"interval": 0, # Optional.
"schedule": {
"hours": [
0 # Optional.
],
"minutes": [
0 # Optional.
],
"monthDays": [
0 # Optional.
],
"monthlyOccurrences": [
{
"day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday".
"occurrence": 0 # Optional.
}
],
"weekDays": [
"str" # Optional.
]
},
"startTime": "2020-02-20 00:00:00", # Optional.
"timeZone": "str" # Optional.
},
"recurrenceInterval": "str", # Optional.
"scanLevel": "str" # Optional. Possible values include: "Full", "Incremental".
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_triggers_delete_trigger_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.delete_trigger.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore