# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import functools
from json import loads as _loads
from typing import Any, AsyncIterable, Callable, Dict, Generic, Optional, TypeVar, Union
import warnings
from azure.core.async_paging import AsyncItemPaged, AsyncList
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import AsyncHttpResponse
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from ..._vendor import _convert_request
from ...operations._operations import build_classification_rules_create_or_update_request, build_classification_rules_delete_request, build_classification_rules_get_request, build_classification_rules_list_all_request, build_classification_rules_list_versions_by_classification_rule_name_request, build_classification_rules_tag_classification_version_request, build_data_sources_create_or_update_request, build_data_sources_delete_request, build_data_sources_get_request, build_data_sources_list_all_request, build_filters_create_or_update_request, build_filters_get_request, build_key_vault_connections_create_request, build_key_vault_connections_delete_request, build_key_vault_connections_get_request, build_key_vault_connections_list_all_request, build_scan_result_cancel_scan_request, build_scan_result_list_scan_history_request, build_scan_result_run_scan_request, build_scan_rulesets_create_or_update_request, build_scan_rulesets_delete_request, build_scan_rulesets_get_request, build_scan_rulesets_list_all_request, build_scans_create_or_update_request, build_scans_delete_request, build_scans_get_request, build_scans_list_by_data_source_request, build_system_scan_rulesets_get_by_version_request, build_system_scan_rulesets_get_latest_request, build_system_scan_rulesets_get_request, build_system_scan_rulesets_list_all_request, build_system_scan_rulesets_list_versions_by_data_source_request, build_triggers_create_trigger_request, build_triggers_delete_trigger_request, build_triggers_get_trigger_request
T = TypeVar('T')
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]
[docs]class KeyVaultConnectionsOperations:
"""KeyVaultConnectionsOperations async operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer) -> None:
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace_async
async def get(
self,
key_vault_name: str,
**kwargs: Any
) -> Any:
"""Gets key vault information.
:param key_vault_name:
:type key_vault_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"baseUrl": "str", # Optional.
"description": "str" # Optional.
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_key_vault_connections_get_request(
key_vault_name=key_vault_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace_async
async def create(
self,
key_vault_name: str,
body: Any,
**kwargs: Any
) -> Any:
"""Creates an instance of a key vault connection.
:param key_vault_name:
:type key_vault_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"baseUrl": "str", # Optional.
"description": "str" # Optional.
}
}
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"baseUrl": "str", # Optional.
"description": "str" # Optional.
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
json = body
request = build_key_vault_connections_create_request(
key_vault_name=key_vault_name,
content_type=content_type,
json=json,
template_url=self.create.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace_async
async def delete(
self,
key_vault_name: str,
**kwargs: Any
) -> Optional[Any]:
"""Deletes the key vault connection associated with the account.
:param key_vault_name:
:type key_vault_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"baseUrl": "str", # Optional.
"description": "str" # Optional.
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_key_vault_connections_delete_request(
key_vault_name=key_vault_name,
template_url=self.delete.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace
def list_all(
self,
**kwargs: Any
) -> AsyncIterable[Any]:
"""List key vault connections in account.
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"baseUrl": "str", # Optional.
"description": "str" # Optional.
}
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_key_vault_connections_list_all_request(
template_url=self.list_all.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_key_vault_connections_list_all_request(
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
async def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(
get_next, extract_data
)
list_all.metadata = {'url': '/azureKeyVaults'} # type: ignore
[docs]class ClassificationRulesOperations:
"""ClassificationRulesOperations async operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer) -> None:
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace_async
async def get(
self,
classification_rule_name: str,
**kwargs: Any
) -> Any:
"""Get a classification rule.
:param classification_rule_name:
:type classification_rule_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_classification_rules_get_request(
classification_rule_name=classification_rule_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace_async
async def create_or_update(
self,
classification_rule_name: str,
body: Any = None,
**kwargs: Any
) -> Any:
"""Creates or Updates a classification rule.
:param classification_rule_name:
:type classification_rule_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
kind = 'CustomClassificationRule' or 'SystemClassificationRule'
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
if body is not None:
json = body
else:
json = None
request = build_classification_rules_create_or_update_request(
classification_rule_name=classification_rule_name,
content_type=content_type,
json=json,
template_url=self.create_or_update.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_or_update.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace_async
async def delete(
self,
classification_rule_name: str,
**kwargs: Any
) -> Optional[Any]:
"""Deletes a classification rule.
:param classification_rule_name:
:type classification_rule_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_classification_rules_delete_request(
classification_rule_name=classification_rule_name,
template_url=self.delete.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace
def list_all(
self,
**kwargs: Any
) -> AsyncIterable[Any]:
"""List classification rules in Account.
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_classification_rules_list_all_request(
template_url=self.list_all.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_classification_rules_list_all_request(
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
async def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(
get_next, extract_data
)
list_all.metadata = {'url': '/classificationrules'} # type: ignore
[docs] @distributed_trace
def list_versions_by_classification_rule_name(
self,
classification_rule_name: str,
**kwargs: Any
) -> AsyncIterable[Any]:
"""Lists the rule versions of a classification rule.
:param classification_rule_name:
:type classification_rule_name: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
kind: ClassificationRule
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_classification_rules_list_versions_by_classification_rule_name_request(
classification_rule_name=classification_rule_name,
template_url=self.list_versions_by_classification_rule_name.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_classification_rules_list_versions_by_classification_rule_name_request(
classification_rule_name=classification_rule_name,
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
async def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(
get_next, extract_data
)
list_versions_by_classification_rule_name.metadata = {'url': '/classificationrules/{classificationRuleName}/versions'} # type: ignore
[docs] @distributed_trace_async
async def tag_classification_version(
self,
classification_rule_name: str,
classification_rule_version: int,
*,
action: str,
**kwargs: Any
) -> Any:
"""Sets Classification Action on a specific classification rule version.
:param classification_rule_name:
:type classification_rule_name: str
:param classification_rule_version:
:type classification_rule_version: int
:keyword action: Possible values are: "Keep" or "Delete".
:paramtype action: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 202
response.json() == {
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"scanResultId": str, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled".
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_classification_rules_tag_classification_version_request(
classification_rule_name=classification_rule_name,
classification_rule_version=classification_rule_version,
action=action,
template_url=self.tag_classification_version.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
tag_classification_version.metadata = {'url': '/classificationrules/{classificationRuleName}/versions/{classificationRuleVersion}/:tag'} # type: ignore
[docs]class DataSourcesOperations:
"""DataSourcesOperations async operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer) -> None:
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace_async
async def create_or_update(
self,
data_source_name: str,
body: Any = None,
**kwargs: Any
) -> Any:
"""Creates or Updates a data source.
:param data_source_name:
:type data_source_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
kind = 'AdlsGen1DataSource' or 'AdlsGen2DataSource' or 'AmazonAccountDataSource' or 'AmazonPostgreSqlDataSource' or 'AmazonS3DataSource' or 'AmazonSqlDataSource' or 'AzureCosmosDbDataSource' or 'AzureDataExplorerDataSource' or 'AzureFileServiceDataSource' or 'AzureMySqlDataSource' or 'AzurePostgreSqlDataSource' or 'AzureResourceGroupDataSource' or 'AzureSqlDataWarehouseDataSource' or 'AzureSqlDatabaseDataSource' or 'AzureSqlDatabaseManagedInstanceDataSource' or 'AzureStorageDataSource' or 'AzureSubscriptionDataSource' or 'AzureSynapseDataSource' or 'AzureSynapseWorkspaceDataSource' or 'OracleDataSource' or 'PowerBIDataSource' or 'SapEccDataSource' or 'SapS4HanaDataSource' or 'SqlServerDatabaseDataSource' or 'TeradataDataSource'
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"scans": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
],
kind: DataSource
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scans": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
],
kind: DataSource
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
if body is not None:
json = body
else:
json = None
request = build_data_sources_create_or_update_request(
data_source_name=data_source_name,
content_type=content_type,
json=json,
template_url=self.create_or_update.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace_async
async def get(
self,
data_source_name: str,
**kwargs: Any
) -> Any:
"""Get a data source.
:param data_source_name:
:type data_source_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scans": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
],
kind: DataSource
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_data_sources_get_request(
data_source_name=data_source_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace_async
async def delete(
self,
data_source_name: str,
**kwargs: Any
) -> Optional[Any]:
"""Deletes a data source.
:param data_source_name:
:type data_source_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scans": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
],
kind: DataSource
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_data_sources_delete_request(
data_source_name=data_source_name,
template_url=self.delete.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace
def list_all(
self,
**kwargs: Any
) -> AsyncIterable[Any]:
"""List data sources in Data catalog.
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scans": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
],
kind: DataSource
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_data_sources_list_all_request(
template_url=self.list_all.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_data_sources_list_all_request(
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
async def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(
get_next, extract_data
)
list_all.metadata = {'url': '/datasources'} # type: ignore
[docs]class FiltersOperations:
"""FiltersOperations async operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer) -> None:
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace_async
async def get(
self,
data_source_name: str,
scan_name: str,
**kwargs: Any
) -> Any:
"""Get a filter.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"excludeUriPrefixes": [
"str" # Optional.
],
"includeUriPrefixes": [
"str" # Optional.
]
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_filters_get_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/filters/custom'} # type: ignore
[docs] @distributed_trace_async
async def create_or_update(
self,
data_source_name: str,
scan_name: str,
body: Any = None,
**kwargs: Any
) -> Any:
"""Creates or updates a filter.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"excludeUriPrefixes": [
"str" # Optional.
],
"includeUriPrefixes": [
"str" # Optional.
]
}
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"excludeUriPrefixes": [
"str" # Optional.
],
"includeUriPrefixes": [
"str" # Optional.
]
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
if body is not None:
json = body
else:
json = None
request = build_filters_create_or_update_request(
data_source_name=data_source_name,
scan_name=scan_name,
content_type=content_type,
json=json,
template_url=self.create_or_update.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/filters/custom'} # type: ignore
[docs]class ScansOperations:
"""ScansOperations async operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer) -> None:
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace_async
async def create_or_update(
self,
data_source_name: str,
scan_name: str,
body: Any,
**kwargs: Any
) -> Any:
"""Creates an instance of a scan.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
kind = 'AdlsGen1CredentialScan' or 'AdlsGen1MsiScan' or 'AdlsGen2CredentialScan' or 'AdlsGen2MsiScan' or 'AmazonAccountCredentialScan' or 'AmazonPostgreSqlCredentialScan' or 'AmazonS3CredentialScan' or 'AmazonS3RoleARNScan' or 'AmazonSqlCredentialScan' or 'AzureCosmosDbCredentialScan' or 'AzureDataExplorerCredentialScan' or 'AzureDataExplorerMsiScan' or 'AzureFileServiceCredentialScan' or 'AzureMySqlCredentialScan' or 'AzurePostgreSqlCredentialScan' or 'AzureResourceGroupCredentialScan' or 'AzureResourceGroupMsiScan' or 'AzureSqlDataWarehouseCredentialScan' or 'AzureSqlDataWarehouseMsiScan' or 'AzureSqlDatabaseCredentialScan' or 'AzureSqlDatabaseManagedInstanceCredentialScan' or 'AzureSqlDatabaseManagedInstanceMsiScan' or 'AzureSqlDatabaseMsiScan' or 'AzureStorageCredentialScan' or 'AzureStorageMsiScan' or 'AzureSubscriptionCredentialScan' or 'AzureSubscriptionMsiScan' or 'AzureSynapseCredentialScan' or 'AzureSynapseMsiScan' or 'AzureSynapseWorkspaceCredentialScan' or 'AzureSynapseWorkspaceMsiScan' or 'OracleCredentialScan' or 'OracleUserPassScan' or 'PowerBIDelegatedScan' or 'PowerBIMsiScan' or 'SapEccCredentialScan' or 'SapEccUserPassScan' or 'SapS4HanaSapS4HanaCredentialScan' or 'SapS4HanaSapS4HanaUserPassScan' or 'SqlServerDatabaseCredentialScan' or 'TeradataCredentialScan' or 'TeradataUserPassScanAutoGenerated' or 'TeradataUserPassScan'
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
json = body
request = build_scans_create_or_update_request(
data_source_name=data_source_name,
scan_name=scan_name,
content_type=content_type,
json=json,
template_url=self.create_or_update.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace_async
async def get(
self,
data_source_name: str,
scan_name: str,
**kwargs: Any
) -> Any:
"""Gets a scan information.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_scans_get_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace_async
async def delete(
self,
data_source_name: str,
scan_name: str,
**kwargs: Any
) -> Optional[Any]:
"""Deletes the scan associated with the data source.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_scans_delete_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.delete.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace
def list_by_data_source(
self,
data_source_name: str,
**kwargs: Any
) -> AsyncIterable[Any]:
"""List scans in data source.
:param data_source_name:
:type data_source_name: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanResults": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
],
kind: Scan
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_scans_list_by_data_source_request(
data_source_name=data_source_name,
template_url=self.list_by_data_source.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_scans_list_by_data_source_request(
data_source_name=data_source_name,
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
async def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(
get_next, extract_data
)
list_by_data_source.metadata = {'url': '/datasources/{dataSourceName}/scans'} # type: ignore
[docs]class ScanResultOperations:
"""ScanResultOperations async operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer) -> None:
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace_async
async def run_scan(
self,
data_source_name: str,
scan_name: str,
run_id: str,
*,
scan_level: Optional[str] = None,
**kwargs: Any
) -> Any:
"""Runs the scan.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:param run_id:
:type run_id: str
:keyword scan_level: Possible values are: "Full" or "Incremental".
:paramtype scan_level: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 202
response.json() == {
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"scanResultId": str, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled".
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_scan_result_run_scan_request(
data_source_name=data_source_name,
scan_name=scan_name,
run_id=run_id,
scan_level=scan_level,
template_url=self.run_scan.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
run_scan.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}'} # type: ignore
[docs] @distributed_trace_async
async def cancel_scan(
self,
data_source_name: str,
scan_name: str,
run_id: str,
**kwargs: Any
) -> Any:
"""Cancels a scan.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:param run_id:
:type run_id: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 202
response.json() == {
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"scanResultId": str, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled".
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_scan_result_cancel_scan_request(
data_source_name=data_source_name,
scan_name=scan_name,
run_id=run_id,
template_url=self.cancel_scan.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
cancel_scan.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}/:cancel'} # type: ignore
[docs] @distributed_trace
def list_scan_history(
self,
data_source_name: str,
scan_name: str,
**kwargs: Any
) -> AsyncIterable[Any]:
"""Lists the scan history of a scan.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"assetsClassified": 0.0, # Optional.
"assetsDiscovered": 0.0, # Optional.
"dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI".
"diagnostics": {
"exceptionCountMap": {
"str": 0 # Optional. Dictionary of :code:`<integer>`.
},
"notifications": [
{
"code": 0, # Optional.
"message": "str" # Optional.
}
]
},
"endTime": "2020-02-20 00:00:00", # Optional.
"error": {
"code": "str", # Optional.
"details": [
{
"code": "str", # Optional.
"details": [
...
],
"message": "str", # Optional.
"target": "str" # Optional.
}
],
"message": "str", # Optional.
"target": "str" # Optional.
},
"errorMessage": "str", # Optional.
"id": "str", # Optional.
"parentId": "str", # Optional.
"pipelineStartTime": "2020-02-20 00:00:00", # Optional.
"queuedTime": "2020-02-20 00:00:00", # Optional.
"resourceId": "str", # Optional.
"runType": "str", # Optional.
"scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental".
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"scanRulesetVersion": 0, # Optional.
"startTime": "2020-02-20 00:00:00", # Optional.
"status": "str" # Optional.
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_scan_result_list_scan_history_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.list_scan_history.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_scan_result_list_scan_history_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
async def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(
get_next, extract_data
)
list_scan_history.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs'} # type: ignore
[docs]class ScanRulesetsOperations:
"""ScanRulesetsOperations async operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer) -> None:
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace_async
async def get(
self,
scan_ruleset_name: str,
**kwargs: Any
) -> Any:
"""Get a scan ruleset.
:param scan_ruleset_name:
:type scan_ruleset_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: ScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_scan_rulesets_get_request(
scan_ruleset_name=scan_ruleset_name,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace_async
async def create_or_update(
self,
scan_ruleset_name: str,
body: Any = None,
**kwargs: Any
) -> Any:
"""Creates or Updates a scan ruleset.
:param scan_ruleset_name:
:type scan_ruleset_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
kind = 'AdlsGen1ScanRuleset' or 'AdlsGen2ScanRuleset' or 'AmazonAccountScanRuleset' or 'AmazonPostgreSqlScanRuleset' or 'AmazonS3ScanRuleset' or 'AmazonSqlScanRuleset' or 'AzureCosmosDbScanRuleset' or 'AzureDataExplorerScanRuleset' or 'AzureFileServiceScanRuleset' or 'AzureMySqlScanRuleset' or 'AzurePostgreSqlScanRuleset' or 'AzureResourceGroupScanRuleset' or 'AzureSqlDataWarehouseScanRuleset' or 'AzureSqlDatabaseScanRuleset' or 'AzureSqlDatabaseManagedInstanceScanRuleset' or 'AzureStorageScanRuleset' or 'AzureSubscriptionScanRuleset' or 'AzureSynapseScanRuleset' or 'AzureSynapseWorkspaceScanRuleset' or 'OracleScanRuleset' or 'PowerBIScanRuleset' or 'SapEccScanRuleset' or 'SapS4HanaScanRuleset' or 'SqlServerDatabaseScanRuleset' or 'TeradataScanRuleset'
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: ScanRuleset
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: ScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
if body is not None:
json = body
else:
json = None
request = build_scan_rulesets_create_or_update_request(
scan_ruleset_name=scan_ruleset_name,
content_type=content_type,
json=json,
template_url=self.create_or_update.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_or_update.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace_async
async def delete(
self,
scan_ruleset_name: str,
**kwargs: Any
) -> Optional[Any]:
"""Deletes a scan ruleset.
:param scan_ruleset_name:
:type scan_ruleset_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: ScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_scan_rulesets_delete_request(
scan_ruleset_name=scan_ruleset_name,
template_url=self.delete.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace
def list_all(
self,
**kwargs: Any
) -> AsyncIterable[Any]:
"""List scan rulesets in Data catalog.
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: ScanRuleset
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_scan_rulesets_list_all_request(
template_url=self.list_all.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_scan_rulesets_list_all_request(
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
async def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(
get_next, extract_data
)
list_all.metadata = {'url': '/scanrulesets'} # type: ignore
[docs]class SystemScanRulesetsOperations:
"""SystemScanRulesetsOperations async operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer) -> None:
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace
def list_all(
self,
**kwargs: Any
) -> AsyncIterable[Any]:
"""List all system scan rulesets for an account.
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: SystemScanRuleset
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_system_scan_rulesets_list_all_request(
template_url=self.list_all.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_system_scan_rulesets_list_all_request(
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
async def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(
get_next, extract_data
)
list_all.metadata = {'url': '/systemScanRulesets'} # type: ignore
[docs] @distributed_trace_async
async def get(
self,
data_source_type: str,
**kwargs: Any
) -> Any:
"""Get a system scan ruleset for a data source.
:param data_source_type: Possible values are: "None", "AzureSubscription",
"AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2",
"AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer",
"AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql",
"SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql",
"AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI".
:type data_source_type: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: SystemScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_system_scan_rulesets_get_request(
data_source_type=data_source_type,
template_url=self.get.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/systemScanRulesets/datasources/{dataSourceType}'} # type: ignore
[docs] @distributed_trace_async
async def get_by_version(
self,
version: int,
*,
data_source_type: Optional[str] = None,
**kwargs: Any
) -> Any:
"""Get a scan ruleset by version.
:param version:
:type version: int
:keyword data_source_type: Possible values are: "None", "AzureSubscription",
"AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2",
"AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer",
"AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql",
"SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql",
"AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI".
:paramtype data_source_type: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: SystemScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_system_scan_rulesets_get_by_version_request(
version=version,
data_source_type=data_source_type,
template_url=self.get_by_version.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get_by_version.metadata = {'url': '/systemScanRulesets/versions/{version}'} # type: ignore
[docs] @distributed_trace_async
async def get_latest(
self,
*,
data_source_type: Optional[str] = None,
**kwargs: Any
) -> Any:
"""Get the latest version of a system scan ruleset.
:keyword data_source_type: Possible values are: "None", "AzureSubscription",
"AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2",
"AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer",
"AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql",
"SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql",
"AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI".
:paramtype data_source_type: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: SystemScanRuleset
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_system_scan_rulesets_get_latest_request(
data_source_type=data_source_type,
template_url=self.get_latest.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get_latest.metadata = {'url': '/systemScanRulesets/versions/latest'} # type: ignore
[docs] @distributed_trace
def list_versions_by_data_source(
self,
*,
data_source_type: Optional[str] = None,
**kwargs: Any
) -> AsyncIterable[Any]:
"""List system scan ruleset versions in Data catalog.
:keyword data_source_type: Possible values are: "None", "AzureSubscription",
"AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2",
"AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer",
"AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql",
"SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql",
"AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI".
:paramtype data_source_type: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[Any]
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"count": 0.0, # Optional.
"nextLink": "str", # Optional.
"value": [
{
"id": "str", # Optional.
"name": "str", # Optional.
"scanRulesetType": "str", # Optional. Possible values include: "Custom", "System".
"status": "str", # Optional. Possible values include: "Enabled", "Disabled".
"version": 0, # Optional.
kind: SystemScanRuleset
}
]
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
def prepare_request(next_link=None):
if not next_link:
request = build_system_scan_rulesets_list_versions_by_data_source_request(
data_source_type=data_source_type,
template_url=self.list_versions_by_data_source.metadata['url'],
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
request = build_system_scan_rulesets_list_versions_by_data_source_request(
data_source_type=data_source_type,
template_url=next_link,
)
request = _convert_request(request)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.method = "GET"
return request
async def extract_data(pipeline_response):
deserialized = _loads(pipeline_response.http_response.body())
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.get("nextLink", None), AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(
get_next, extract_data
)
list_versions_by_data_source.metadata = {'url': '/systemScanRulesets/versions'} # type: ignore
[docs]class TriggersOperations:
"""TriggersOperations async operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
def __init__(self, client, config, serializer, deserializer) -> None:
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
[docs] @distributed_trace_async
async def get_trigger(
self,
data_source_name: str,
scan_name: str,
**kwargs: Any
) -> Any:
"""Gets trigger information.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"createdAt": "2020-02-20 00:00:00", # Optional.
"incrementalScanStartTime": "2020-02-20 00:00:00", # Optional.
"lastModifiedAt": "2020-02-20 00:00:00", # Optional.
"lastScheduled": "2020-02-20 00:00:00", # Optional.
"recurrence": {
"endTime": "2020-02-20 00:00:00", # Optional.
"frequency": "str", # Optional. Possible values include: "Week", "Month".
"interval": 0, # Optional.
"schedule": {
"hours": [
0 # Optional.
],
"minutes": [
0 # Optional.
],
"monthDays": [
0 # Optional.
],
"monthlyOccurrences": [
{
"day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday".
"occurrence": 0 # Optional.
}
],
"weekDays": [
"str" # Optional.
]
},
"startTime": "2020-02-20 00:00:00", # Optional.
"timeZone": "str" # Optional.
},
"recurrenceInterval": "str", # Optional.
"scanLevel": "str" # Optional. Possible values include: "Full", "Incremental".
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_triggers_get_trigger_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.get_trigger.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore
[docs] @distributed_trace_async
async def create_trigger(
self,
data_source_name: str,
scan_name: str,
body: Any,
**kwargs: Any
) -> Any:
"""Creates an instance of a trigger.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:param body:
:type body: Any
:return: JSON object
:rtype: Any
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"createdAt": "2020-02-20 00:00:00", # Optional.
"incrementalScanStartTime": "2020-02-20 00:00:00", # Optional.
"lastModifiedAt": "2020-02-20 00:00:00", # Optional.
"lastScheduled": "2020-02-20 00:00:00", # Optional.
"recurrence": {
"endTime": "2020-02-20 00:00:00", # Optional.
"frequency": "str", # Optional. Possible values include: "Week", "Month".
"interval": 0, # Optional.
"schedule": {
"hours": [
0 # Optional.
],
"minutes": [
0 # Optional.
],
"monthDays": [
0 # Optional.
],
"monthlyOccurrences": [
{
"day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday".
"occurrence": 0 # Optional.
}
],
"weekDays": [
"str" # Optional.
]
},
"startTime": "2020-02-20 00:00:00", # Optional.
"timeZone": "str" # Optional.
},
"recurrenceInterval": "str", # Optional.
"scanLevel": "str" # Optional. Possible values include: "Full", "Incremental".
}
}
# response body for status code(s): 200, 201
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"createdAt": "2020-02-20 00:00:00", # Optional.
"incrementalScanStartTime": "2020-02-20 00:00:00", # Optional.
"lastModifiedAt": "2020-02-20 00:00:00", # Optional.
"lastScheduled": "2020-02-20 00:00:00", # Optional.
"recurrence": {
"endTime": "2020-02-20 00:00:00", # Optional.
"frequency": "str", # Optional. Possible values include: "Week", "Month".
"interval": 0, # Optional.
"schedule": {
"hours": [
0 # Optional.
],
"minutes": [
0 # Optional.
],
"monthDays": [
0 # Optional.
],
"monthlyOccurrences": [
{
"day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday".
"occurrence": 0 # Optional.
}
],
"weekDays": [
"str" # Optional.
]
},
"startTime": "2020-02-20 00:00:00", # Optional.
"timeZone": "str" # Optional.
},
"recurrenceInterval": "str", # Optional.
"scanLevel": "str" # Optional. Possible values include: "Full", "Incremental".
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Any]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop('content_type', "application/json") # type: Optional[str]
json = body
request = build_triggers_create_trigger_request(
data_source_name=data_source_name,
scan_name=scan_name,
content_type=content_type,
json=json,
template_url=self.create_trigger.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
create_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore
[docs] @distributed_trace_async
async def delete_trigger(
self,
data_source_name: str,
scan_name: str,
**kwargs: Any
) -> Optional[Any]:
"""Deletes the trigger associated with the scan.
:param data_source_name:
:type data_source_name: str
:param scan_name:
:type scan_name: str
:return: JSON object
:rtype: Any or None
:raises: ~azure.core.exceptions.HttpResponseError
Example:
.. code-block:: python
# response body for status code(s): 200
response.json() == {
"id": "str", # Optional.
"name": "str", # Optional.
"properties": {
"createdAt": "2020-02-20 00:00:00", # Optional.
"incrementalScanStartTime": "2020-02-20 00:00:00", # Optional.
"lastModifiedAt": "2020-02-20 00:00:00", # Optional.
"lastScheduled": "2020-02-20 00:00:00", # Optional.
"recurrence": {
"endTime": "2020-02-20 00:00:00", # Optional.
"frequency": "str", # Optional. Possible values include: "Week", "Month".
"interval": 0, # Optional.
"schedule": {
"hours": [
0 # Optional.
],
"minutes": [
0 # Optional.
],
"monthDays": [
0 # Optional.
],
"monthlyOccurrences": [
{
"day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday".
"occurrence": 0 # Optional.
}
],
"weekDays": [
"str" # Optional.
]
},
"startTime": "2020-02-20 00:00:00", # Optional.
"timeZone": "str" # Optional.
},
"recurrenceInterval": "str", # Optional.
"scanLevel": "str" # Optional. Possible values include: "Full", "Incremental".
}
}
"""
cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
request = build_triggers_delete_trigger_request(
data_source_name=data_source_name,
scan_name=scan_name,
template_url=self.delete_trigger.metadata['url'],
)
path_format_arguments = {
"Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
deserialized = None
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
delete_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore