Source code for azure.purview.scanning.aio.operations._operations

# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import functools
from json import loads as _loads
from typing import Any, AsyncIterable, Callable, Dict, Generic, Optional, TypeVar, Union
import warnings

from azure.core.async_paging import AsyncItemPaged, AsyncList
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import AsyncHttpResponse
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async

from ..._vendor import _convert_request
from ...operations._operations import build_classification_rules_create_or_update_request, build_classification_rules_delete_request, build_classification_rules_get_request, build_classification_rules_list_all_request, build_classification_rules_list_versions_by_classification_rule_name_request, build_classification_rules_tag_classification_version_request, build_data_sources_create_or_update_request, build_data_sources_delete_request, build_data_sources_get_request, build_data_sources_list_all_request, build_filters_create_or_update_request, build_filters_get_request, build_key_vault_connections_create_request, build_key_vault_connections_delete_request, build_key_vault_connections_get_request, build_key_vault_connections_list_all_request, build_scan_result_cancel_scan_request, build_scan_result_list_scan_history_request, build_scan_result_run_scan_request, build_scan_rulesets_create_or_update_request, build_scan_rulesets_delete_request, build_scan_rulesets_get_request, build_scan_rulesets_list_all_request, build_scans_create_or_update_request, build_scans_delete_request, build_scans_get_request, build_scans_list_by_data_source_request, build_system_scan_rulesets_get_by_version_request, build_system_scan_rulesets_get_latest_request, build_system_scan_rulesets_get_request, build_system_scan_rulesets_list_all_request, build_system_scan_rulesets_list_versions_by_data_source_request, build_triggers_create_trigger_request, build_triggers_delete_trigger_request, build_triggers_get_trigger_request

T = TypeVar('T')
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]

[docs]class KeyVaultConnectionsOperations: """KeyVaultConnectionsOperations async operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer) -> None: self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace_async async def get( self, key_vault_name: str, **kwargs: Any ) -> Any: """Gets key vault information. :param key_vault_name: :type key_vault_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "baseUrl": "str", # Optional. "description": "str" # Optional. } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_key_vault_connections_get_request( key_vault_name=key_vault_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace_async async def create( self, key_vault_name: str, body: Any, **kwargs: Any ) -> Any: """Creates an instance of a key vault connection. :param key_vault_name: :type key_vault_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "properties": { "baseUrl": "str", # Optional. "description": "str" # Optional. } } # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "baseUrl": "str", # Optional. "description": "str" # Optional. } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] json = body request = build_key_vault_connections_create_request( key_vault_name=key_vault_name, content_type=content_type, json=json, template_url=self.create.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace_async async def delete( self, key_vault_name: str, **kwargs: Any ) -> Optional[Any]: """Deletes the key vault connection associated with the account. :param key_vault_name: :type key_vault_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "baseUrl": "str", # Optional. "description": "str" # Optional. } } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_key_vault_connections_delete_request( key_vault_name=key_vault_name, template_url=self.delete.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete.metadata = {'url': '/azureKeyVaults/{keyVaultName}'} # type: ignore
[docs] @distributed_trace def list_all( self, **kwargs: Any ) -> AsyncIterable[Any]: """List key vault connections in account. :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "properties": { "baseUrl": "str", # Optional. "description": "str" # Optional. } } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_key_vault_connections_list_all_request( template_url=self.list_all.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_key_vault_connections_list_all_request( template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request async def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged( get_next, extract_data )
list_all.metadata = {'url': '/azureKeyVaults'} # type: ignore
[docs]class ClassificationRulesOperations: """ClassificationRulesOperations async operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer) -> None: self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace_async async def get( self, classification_rule_name: str, **kwargs: Any ) -> Any: """Get a classification rule. :param classification_rule_name: :type classification_rule_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_classification_rules_get_request( classification_rule_name=classification_rule_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace_async async def create_or_update( self, classification_rule_name: str, body: Any = None, **kwargs: Any ) -> Any: """Creates or Updates a classification rule. :param classification_rule_name: :type classification_rule_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python kind = 'CustomClassificationRule' or 'SystemClassificationRule' # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] if body is not None: json = body else: json = None request = build_classification_rules_create_or_update_request( classification_rule_name=classification_rule_name, content_type=content_type, json=json, template_url=self.create_or_update.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_or_update.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace_async async def delete( self, classification_rule_name: str, **kwargs: Any ) -> Optional[Any]: """Deletes a classification rule. :param classification_rule_name: :type classification_rule_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_classification_rules_delete_request( classification_rule_name=classification_rule_name, template_url=self.delete.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete.metadata = {'url': '/classificationrules/{classificationRuleName}'} # type: ignore
[docs] @distributed_trace def list_all( self, **kwargs: Any ) -> AsyncIterable[Any]: """List classification rules in Account. :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_classification_rules_list_all_request( template_url=self.list_all.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_classification_rules_list_all_request( template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request async def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged( get_next, extract_data )
list_all.metadata = {'url': '/classificationrules'} # type: ignore
[docs] @distributed_trace def list_versions_by_classification_rule_name( self, classification_rule_name: str, **kwargs: Any ) -> AsyncIterable[Any]: """Lists the rule versions of a classification rule. :param classification_rule_name: :type classification_rule_name: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. kind: ClassificationRule } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_classification_rules_list_versions_by_classification_rule_name_request( classification_rule_name=classification_rule_name, template_url=self.list_versions_by_classification_rule_name.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_classification_rules_list_versions_by_classification_rule_name_request( classification_rule_name=classification_rule_name, template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request async def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged( get_next, extract_data )
list_versions_by_classification_rule_name.metadata = {'url': '/classificationrules/{classificationRuleName}/versions'} # type: ignore
[docs] @distributed_trace_async async def tag_classification_version( self, classification_rule_name: str, classification_rule_version: int, *, action: str, **kwargs: Any ) -> Any: """Sets Classification Action on a specific classification rule version. :param classification_rule_name: :type classification_rule_name: str :param classification_rule_version: :type classification_rule_version: int :keyword action: Possible values are: "Keep" or "Delete". :paramtype action: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 202 response.json() == { "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "scanResultId": str, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled". } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_classification_rules_tag_classification_version_request( classification_rule_name=classification_rule_name, classification_rule_version=classification_rule_version, action=action, template_url=self.tag_classification_version.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
tag_classification_version.metadata = {'url': '/classificationrules/{classificationRuleName}/versions/{classificationRuleVersion}/:tag'} # type: ignore
[docs]class DataSourcesOperations: """DataSourcesOperations async operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer) -> None: self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace_async async def create_or_update( self, data_source_name: str, body: Any = None, **kwargs: Any ) -> Any: """Creates or Updates a data source. :param data_source_name: :type data_source_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python kind = 'AdlsGen1DataSource' or 'AdlsGen2DataSource' or 'AmazonAccountDataSource' or 'AmazonPostgreSqlDataSource' or 'AmazonS3DataSource' or 'AmazonSqlDataSource' or 'AzureCosmosDbDataSource' or 'AzureDataExplorerDataSource' or 'AzureFileServiceDataSource' or 'AzureMySqlDataSource' or 'AzurePostgreSqlDataSource' or 'AzureResourceGroupDataSource' or 'AzureSqlDataWarehouseDataSource' or 'AzureSqlDatabaseDataSource' or 'AzureSqlDatabaseManagedInstanceDataSource' or 'AzureStorageDataSource' or 'AzureSubscriptionDataSource' or 'AzureSynapseDataSource' or 'AzureSynapseWorkspaceDataSource' or 'OracleDataSource' or 'PowerBIDataSource' or 'SapEccDataSource' or 'SapS4HanaDataSource' or 'SqlServerDatabaseDataSource' or 'TeradataDataSource' # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "scans": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ], kind: DataSource } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scans": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ], kind: DataSource } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] if body is not None: json = body else: json = None request = build_data_sources_create_or_update_request( data_source_name=data_source_name, content_type=content_type, json=json, template_url=self.create_or_update.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace_async async def get( self, data_source_name: str, **kwargs: Any ) -> Any: """Get a data source. :param data_source_name: :type data_source_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scans": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ], kind: DataSource } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_data_sources_get_request( data_source_name=data_source_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace_async async def delete( self, data_source_name: str, **kwargs: Any ) -> Optional[Any]: """Deletes a data source. :param data_source_name: :type data_source_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scans": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ], kind: DataSource } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_data_sources_delete_request( data_source_name=data_source_name, template_url=self.delete.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete.metadata = {'url': '/datasources/{dataSourceName}'} # type: ignore
[docs] @distributed_trace def list_all( self, **kwargs: Any ) -> AsyncIterable[Any]: """List data sources in Data catalog. :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "scans": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ], kind: DataSource } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_data_sources_list_all_request( template_url=self.list_all.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_data_sources_list_all_request( template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request async def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged( get_next, extract_data )
list_all.metadata = {'url': '/datasources'} # type: ignore
[docs]class FiltersOperations: """FiltersOperations async operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer) -> None: self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace_async async def get( self, data_source_name: str, scan_name: str, **kwargs: Any ) -> Any: """Get a filter. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "excludeUriPrefixes": [ "str" # Optional. ], "includeUriPrefixes": [ "str" # Optional. ] } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_filters_get_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/filters/custom'} # type: ignore
[docs] @distributed_trace_async async def create_or_update( self, data_source_name: str, scan_name: str, body: Any = None, **kwargs: Any ) -> Any: """Creates or updates a filter. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "properties": { "excludeUriPrefixes": [ "str" # Optional. ], "includeUriPrefixes": [ "str" # Optional. ] } } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "excludeUriPrefixes": [ "str" # Optional. ], "includeUriPrefixes": [ "str" # Optional. ] } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] if body is not None: json = body else: json = None request = build_filters_create_or_update_request( data_source_name=data_source_name, scan_name=scan_name, content_type=content_type, json=json, template_url=self.create_or_update.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/filters/custom'} # type: ignore
[docs]class ScansOperations: """ScansOperations async operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer) -> None: self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace_async async def create_or_update( self, data_source_name: str, scan_name: str, body: Any, **kwargs: Any ) -> Any: """Creates an instance of a scan. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python kind = 'AdlsGen1CredentialScan' or 'AdlsGen1MsiScan' or 'AdlsGen2CredentialScan' or 'AdlsGen2MsiScan' or 'AmazonAccountCredentialScan' or 'AmazonPostgreSqlCredentialScan' or 'AmazonS3CredentialScan' or 'AmazonS3RoleARNScan' or 'AmazonSqlCredentialScan' or 'AzureCosmosDbCredentialScan' or 'AzureDataExplorerCredentialScan' or 'AzureDataExplorerMsiScan' or 'AzureFileServiceCredentialScan' or 'AzureMySqlCredentialScan' or 'AzurePostgreSqlCredentialScan' or 'AzureResourceGroupCredentialScan' or 'AzureResourceGroupMsiScan' or 'AzureSqlDataWarehouseCredentialScan' or 'AzureSqlDataWarehouseMsiScan' or 'AzureSqlDatabaseCredentialScan' or 'AzureSqlDatabaseManagedInstanceCredentialScan' or 'AzureSqlDatabaseManagedInstanceMsiScan' or 'AzureSqlDatabaseMsiScan' or 'AzureStorageCredentialScan' or 'AzureStorageMsiScan' or 'AzureSubscriptionCredentialScan' or 'AzureSubscriptionMsiScan' or 'AzureSynapseCredentialScan' or 'AzureSynapseMsiScan' or 'AzureSynapseWorkspaceCredentialScan' or 'AzureSynapseWorkspaceMsiScan' or 'OracleCredentialScan' or 'OracleUserPassScan' or 'PowerBIDelegatedScan' or 'PowerBIMsiScan' or 'SapEccCredentialScan' or 'SapEccUserPassScan' or 'SapS4HanaSapS4HanaCredentialScan' or 'SapS4HanaSapS4HanaUserPassScan' or 'SqlServerDatabaseCredentialScan' or 'TeradataCredentialScan' or 'TeradataUserPassScanAutoGenerated' or 'TeradataUserPassScan' # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] json = body request = build_scans_create_or_update_request( data_source_name=data_source_name, scan_name=scan_name, content_type=content_type, json=json, template_url=self.create_or_update.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_or_update.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace_async async def get( self, data_source_name: str, scan_name: str, **kwargs: Any ) -> Any: """Gets a scan information. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_scans_get_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace_async async def delete( self, data_source_name: str, scan_name: str, **kwargs: Any ) -> Optional[Any]: """Deletes the scan associated with the data source. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_scans_delete_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.delete.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}'} # type: ignore
[docs] @distributed_trace def list_by_data_source( self, data_source_name: str, **kwargs: Any ) -> AsyncIterable[Any]: """List scans in data source. :param data_source_name: :type data_source_name: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "scanResults": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ], kind: Scan } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_scans_list_by_data_source_request( data_source_name=data_source_name, template_url=self.list_by_data_source.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_scans_list_by_data_source_request( data_source_name=data_source_name, template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request async def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged( get_next, extract_data )
list_by_data_source.metadata = {'url': '/datasources/{dataSourceName}/scans'} # type: ignore
[docs]class ScanResultOperations: """ScanResultOperations async operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer) -> None: self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace_async async def run_scan( self, data_source_name: str, scan_name: str, run_id: str, *, scan_level: Optional[str] = None, **kwargs: Any ) -> Any: """Runs the scan. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :param run_id: :type run_id: str :keyword scan_level: Possible values are: "Full" or "Incremental". :paramtype scan_level: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 202 response.json() == { "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "scanResultId": str, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled". } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_scan_result_run_scan_request( data_source_name=data_source_name, scan_name=scan_name, run_id=run_id, scan_level=scan_level, template_url=self.run_scan.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
run_scan.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}'} # type: ignore
[docs] @distributed_trace_async async def cancel_scan( self, data_source_name: str, scan_name: str, run_id: str, **kwargs: Any ) -> Any: """Cancels a scan. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :param run_id: :type run_id: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 202 response.json() == { "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "scanResultId": str, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. Possible values include: "Accepted", "InProgress", "TransientFailure", "Succeeded", "Failed", "Canceled". } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_scan_result_cancel_scan_request( data_source_name=data_source_name, scan_name=scan_name, run_id=run_id, template_url=self.cancel_scan.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
cancel_scan.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs/{runId}/:cancel'} # type: ignore
[docs] @distributed_trace def list_scan_history( self, data_source_name: str, scan_name: str, **kwargs: Any ) -> AsyncIterable[Any]: """Lists the scan history of a scan. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "assetsClassified": 0.0, # Optional. "assetsDiscovered": 0.0, # Optional. "dataSourceType": "str", # Optional. Possible values include: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", "PowerBI". "diagnostics": { "exceptionCountMap": { "str": 0 # Optional. Dictionary of :code:`<integer>`. }, "notifications": [ { "code": 0, # Optional. "message": "str" # Optional. } ] }, "endTime": "2020-02-20 00:00:00", # Optional. "error": { "code": "str", # Optional. "details": [ { "code": "str", # Optional. "details": [ ... ], "message": "str", # Optional. "target": "str" # Optional. } ], "message": "str", # Optional. "target": "str" # Optional. }, "errorMessage": "str", # Optional. "id": "str", # Optional. "parentId": "str", # Optional. "pipelineStartTime": "2020-02-20 00:00:00", # Optional. "queuedTime": "2020-02-20 00:00:00", # Optional. "resourceId": "str", # Optional. "runType": "str", # Optional. "scanLevelType": "str", # Optional. Possible values include: "Full", "Incremental". "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "scanRulesetVersion": 0, # Optional. "startTime": "2020-02-20 00:00:00", # Optional. "status": "str" # Optional. } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_scan_result_list_scan_history_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.list_scan_history.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_scan_result_list_scan_history_request( data_source_name=data_source_name, scan_name=scan_name, template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request async def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged( get_next, extract_data )
list_scan_history.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/runs'} # type: ignore
[docs]class ScanRulesetsOperations: """ScanRulesetsOperations async operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer) -> None: self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace_async async def get( self, scan_ruleset_name: str, **kwargs: Any ) -> Any: """Get a scan ruleset. :param scan_ruleset_name: :type scan_ruleset_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: ScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_scan_rulesets_get_request( scan_ruleset_name=scan_ruleset_name, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace_async async def create_or_update( self, scan_ruleset_name: str, body: Any = None, **kwargs: Any ) -> Any: """Creates or Updates a scan ruleset. :param scan_ruleset_name: :type scan_ruleset_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python kind = 'AdlsGen1ScanRuleset' or 'AdlsGen2ScanRuleset' or 'AmazonAccountScanRuleset' or 'AmazonPostgreSqlScanRuleset' or 'AmazonS3ScanRuleset' or 'AmazonSqlScanRuleset' or 'AzureCosmosDbScanRuleset' or 'AzureDataExplorerScanRuleset' or 'AzureFileServiceScanRuleset' or 'AzureMySqlScanRuleset' or 'AzurePostgreSqlScanRuleset' or 'AzureResourceGroupScanRuleset' or 'AzureSqlDataWarehouseScanRuleset' or 'AzureSqlDatabaseScanRuleset' or 'AzureSqlDatabaseManagedInstanceScanRuleset' or 'AzureStorageScanRuleset' or 'AzureSubscriptionScanRuleset' or 'AzureSynapseScanRuleset' or 'AzureSynapseWorkspaceScanRuleset' or 'OracleScanRuleset' or 'PowerBIScanRuleset' or 'SapEccScanRuleset' or 'SapS4HanaScanRuleset' or 'SqlServerDatabaseScanRuleset' or 'TeradataScanRuleset' # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: ScanRuleset } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: ScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] if body is not None: json = body else: json = None request = build_scan_rulesets_create_or_update_request( scan_ruleset_name=scan_ruleset_name, content_type=content_type, json=json, template_url=self.create_or_update.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_or_update.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace_async async def delete( self, scan_ruleset_name: str, **kwargs: Any ) -> Optional[Any]: """Deletes a scan ruleset. :param scan_ruleset_name: :type scan_ruleset_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: ScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_scan_rulesets_delete_request( scan_ruleset_name=scan_ruleset_name, template_url=self.delete.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete.metadata = {'url': '/scanrulesets/{scanRulesetName}'} # type: ignore
[docs] @distributed_trace def list_all( self, **kwargs: Any ) -> AsyncIterable[Any]: """List scan rulesets in Data catalog. :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: ScanRuleset } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_scan_rulesets_list_all_request( template_url=self.list_all.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_scan_rulesets_list_all_request( template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request async def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged( get_next, extract_data )
list_all.metadata = {'url': '/scanrulesets'} # type: ignore
[docs]class SystemScanRulesetsOperations: """SystemScanRulesetsOperations async operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer) -> None: self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace def list_all( self, **kwargs: Any ) -> AsyncIterable[Any]: """List all system scan rulesets for an account. :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: SystemScanRuleset } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_system_scan_rulesets_list_all_request( template_url=self.list_all.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_system_scan_rulesets_list_all_request( template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request async def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged( get_next, extract_data )
list_all.metadata = {'url': '/systemScanRulesets'} # type: ignore
[docs] @distributed_trace_async async def get( self, data_source_type: str, **kwargs: Any ) -> Any: """Get a system scan ruleset for a data source. :param data_source_type: Possible values are: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI". :type data_source_type: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: SystemScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_system_scan_rulesets_get_request( data_source_type=data_source_type, template_url=self.get.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': '/systemScanRulesets/datasources/{dataSourceType}'} # type: ignore
[docs] @distributed_trace_async async def get_by_version( self, version: int, *, data_source_type: Optional[str] = None, **kwargs: Any ) -> Any: """Get a scan ruleset by version. :param version: :type version: int :keyword data_source_type: Possible values are: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI". :paramtype data_source_type: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: SystemScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_system_scan_rulesets_get_by_version_request( version=version, data_source_type=data_source_type, template_url=self.get_by_version.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get_by_version.metadata = {'url': '/systemScanRulesets/versions/{version}'} # type: ignore
[docs] @distributed_trace_async async def get_latest( self, *, data_source_type: Optional[str] = None, **kwargs: Any ) -> Any: """Get the latest version of a system scan ruleset. :keyword data_source_type: Possible values are: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI". :paramtype data_source_type: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: SystemScanRuleset } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_system_scan_rulesets_get_latest_request( data_source_type=data_source_type, template_url=self.get_latest.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get_latest.metadata = {'url': '/systemScanRulesets/versions/latest'} # type: ignore
[docs] @distributed_trace def list_versions_by_data_source( self, *, data_source_type: Optional[str] = None, **kwargs: Any ) -> AsyncIterable[Any]: """List system scan ruleset versions in Data catalog. :keyword data_source_type: Possible values are: "None", "AzureSubscription", "AzureResourceGroup", "AzureSynapseWorkspace", "AzureSynapse", "AdlsGen1", "AdlsGen2", "AmazonAccount", "AmazonS3", "AmazonSql", "AzureCosmosDb", "AzureDataExplorer", "AzureFileService", "AzureSqlDatabase", "AmazonPostgreSql", "AzurePostgreSql", "SqlServerDatabase", "AzureSqlDatabaseManagedInstance", "AzureSqlDataWarehouse", "AzureMySql", "AzureStorage", "Teradata", "Oracle", "SapS4Hana", "SapEcc", and "PowerBI". :paramtype data_source_type: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[Any] :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "count": 0.0, # Optional. "nextLink": "str", # Optional. "value": [ { "id": "str", # Optional. "name": "str", # Optional. "scanRulesetType": "str", # Optional. Possible values include: "Custom", "System". "status": "str", # Optional. Possible values include: "Enabled", "Disabled". "version": 0, # Optional. kind: SystemScanRuleset } ] } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_system_scan_rulesets_list_versions_by_data_source_request( data_source_type=data_source_type, template_url=self.list_versions_by_data_source.metadata['url'], ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) else: request = build_system_scan_rulesets_list_versions_by_data_source_request( data_source_type=data_source_type, template_url=next_link, ) request = _convert_request(request) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.method = "GET" return request async def extract_data(pipeline_response): deserialized = _loads(pipeline_response.http_response.body()) list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged( get_next, extract_data )
list_versions_by_data_source.metadata = {'url': '/systemScanRulesets/versions'} # type: ignore
[docs]class TriggersOperations: """TriggersOperations async operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ def __init__(self, client, config, serializer, deserializer) -> None: self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace_async async def get_trigger( self, data_source_name: str, scan_name: str, **kwargs: Any ) -> Any: """Gets trigger information. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "createdAt": "2020-02-20 00:00:00", # Optional. "incrementalScanStartTime": "2020-02-20 00:00:00", # Optional. "lastModifiedAt": "2020-02-20 00:00:00", # Optional. "lastScheduled": "2020-02-20 00:00:00", # Optional. "recurrence": { "endTime": "2020-02-20 00:00:00", # Optional. "frequency": "str", # Optional. Possible values include: "Week", "Month". "interval": 0, # Optional. "schedule": { "hours": [ 0 # Optional. ], "minutes": [ 0 # Optional. ], "monthDays": [ 0 # Optional. ], "monthlyOccurrences": [ { "day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday". "occurrence": 0 # Optional. } ], "weekDays": [ "str" # Optional. ] }, "startTime": "2020-02-20 00:00:00", # Optional. "timeZone": "str" # Optional. }, "recurrenceInterval": "str", # Optional. "scanLevel": "str" # Optional. Possible values include: "Full", "Incremental". } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_triggers_get_trigger_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.get_trigger.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore
[docs] @distributed_trace_async async def create_trigger( self, data_source_name: str, scan_name: str, body: Any, **kwargs: Any ) -> Any: """Creates an instance of a trigger. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :param body: :type body: Any :return: JSON object :rtype: Any :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # JSON input template you can fill out and use as your body input. body = { "id": "str", # Optional. "name": "str", # Optional. "properties": { "createdAt": "2020-02-20 00:00:00", # Optional. "incrementalScanStartTime": "2020-02-20 00:00:00", # Optional. "lastModifiedAt": "2020-02-20 00:00:00", # Optional. "lastScheduled": "2020-02-20 00:00:00", # Optional. "recurrence": { "endTime": "2020-02-20 00:00:00", # Optional. "frequency": "str", # Optional. Possible values include: "Week", "Month". "interval": 0, # Optional. "schedule": { "hours": [ 0 # Optional. ], "minutes": [ 0 # Optional. ], "monthDays": [ 0 # Optional. ], "monthlyOccurrences": [ { "day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday". "occurrence": 0 # Optional. } ], "weekDays": [ "str" # Optional. ] }, "startTime": "2020-02-20 00:00:00", # Optional. "timeZone": "str" # Optional. }, "recurrenceInterval": "str", # Optional. "scanLevel": "str" # Optional. Possible values include: "Full", "Incremental". } } # response body for status code(s): 200, 201 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "createdAt": "2020-02-20 00:00:00", # Optional. "incrementalScanStartTime": "2020-02-20 00:00:00", # Optional. "lastModifiedAt": "2020-02-20 00:00:00", # Optional. "lastScheduled": "2020-02-20 00:00:00", # Optional. "recurrence": { "endTime": "2020-02-20 00:00:00", # Optional. "frequency": "str", # Optional. Possible values include: "Week", "Month". "interval": 0, # Optional. "schedule": { "hours": [ 0 # Optional. ], "minutes": [ 0 # Optional. ], "monthDays": [ 0 # Optional. ], "monthlyOccurrences": [ { "day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday". "occurrence": 0 # Optional. } ], "weekDays": [ "str" # Optional. ] }, "startTime": "2020-02-20 00:00:00", # Optional. "timeZone": "str" # Optional. }, "recurrenceInterval": "str", # Optional. "scanLevel": "str" # Optional. Possible values include: "Full", "Incremental". } } """ cls = kwargs.pop('cls', None) # type: ClsType[Any] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] json = body request = build_triggers_create_trigger_request( data_source_name=data_source_name, scan_name=scan_name, content_type=content_type, json=json, template_url=self.create_trigger.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 201: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore
[docs] @distributed_trace_async async def delete_trigger( self, data_source_name: str, scan_name: str, **kwargs: Any ) -> Optional[Any]: """Deletes the trigger associated with the scan. :param data_source_name: :type data_source_name: str :param scan_name: :type scan_name: str :return: JSON object :rtype: Any or None :raises: ~azure.core.exceptions.HttpResponseError Example: .. code-block:: python # response body for status code(s): 200 response.json() == { "id": "str", # Optional. "name": "str", # Optional. "properties": { "createdAt": "2020-02-20 00:00:00", # Optional. "incrementalScanStartTime": "2020-02-20 00:00:00", # Optional. "lastModifiedAt": "2020-02-20 00:00:00", # Optional. "lastScheduled": "2020-02-20 00:00:00", # Optional. "recurrence": { "endTime": "2020-02-20 00:00:00", # Optional. "frequency": "str", # Optional. Possible values include: "Week", "Month". "interval": 0, # Optional. "schedule": { "hours": [ 0 # Optional. ], "minutes": [ 0 # Optional. ], "monthDays": [ 0 # Optional. ], "monthlyOccurrences": [ { "day": "str", # Optional. Possible values include: "Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday". "occurrence": 0 # Optional. } ], "weekDays": [ "str" # Optional. ] }, "startTime": "2020-02-20 00:00:00", # Optional. "timeZone": "str" # Optional. }, "recurrenceInterval": "str", # Optional. "scanLevel": "str" # Optional. Possible values include: "Full", "Incremental". } } """ cls = kwargs.pop('cls', None) # type: ClsType[Optional[Any]] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) request = build_triggers_delete_trigger_request( data_source_name=data_source_name, scan_name=scan_name, template_url=self.delete_trigger.metadata['url'], ) path_format_arguments = { "Endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) pipeline_response = await self._client.send_request(request, stream=False, _return_pipeline_response=True, **kwargs) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized
delete_trigger.metadata = {'url': '/datasources/{dataSourceName}/scans/{scanName}/triggers/default'} # type: ignore