Source code for azure.ai.metricsadvisor.models._models

# coding=utf-8
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------

# pylint:disable=protected-access
# pylint:disable=too-many-lines

import datetime
from typing import (
    Any,
    Union,
    List,
    Dict,
    TYPE_CHECKING
)
from enum import Enum
import msrest
from .._generated.models import (
    MetricAlertingConfiguration as _MetricAlertingConfiguration,
    SeverityCondition as _SeverityCondition,
    TopNGroupScope as _TopNGroupScope,
    AlertSnoozeCondition as _AlertSnoozeCondition,
    ValueCondition as _ValueCondition,
    EmailHookInfo as _EmailHookInfo,
    WebhookHookInfo as _WebhookHookInfo,
    WholeMetricConfiguration as _WholeMetricConfiguration,
    SuppressCondition as _SuppressCondition,
    HardThresholdCondition as _HardThresholdCondition,
    ChangeThresholdCondition as _ChangeThresholdCondition,
    SmartDetectionCondition as _SmartDetectionCondition,
    DimensionGroupConfiguration as _DimensionGroupConfiguration,
    SeriesConfiguration as _SeriesConfiguration,
    EmailHookInfoPatch as _EmailHookInfoPatch,
    WebhookHookInfoPatch as _WebhookHookInfoPatch,
    Dimension as _Dimension,
    Metric as _Metric,
    AnomalyFeedback as _AnomalyFeedback,
    FeedbackDimensionFilter,
    AnomalyFeedbackValue,
    ChangePointFeedback as _ChangePointFeedback,
    ChangePointFeedbackValue,
    CommentFeedback as _CommentFeedback,
    CommentFeedbackValue,
    PeriodFeedback as _PeriodFeedback,
    PeriodFeedbackValue,
    EmailHookParameter as _EmailHookParameter,
    WebhookHookParameter as _WebhookHookParameter,
    AzureBlobParameter as _AzureBlobParameter,
    SqlSourceParameter as _SqlSourceParameter,
    AzureApplicationInsightsParameter as _AzureApplicationInsightsParameter,
    AzureCosmosDBParameter as _AzureCosmosDBParameter,
    AzureTableParameter as _AzureTableParameter,
    HttpRequestParameter as _HttpRequestParameter,
    InfluxDBParameter as _InfluxDBParameter,
    MongoDBParameter as _MongoDBParameter,
    AzureDataLakeStorageGen2Parameter as _AzureDataLakeStorageGen2Parameter,
    ElasticsearchParameter as _ElasticsearchParameter,
    DimensionGroupIdentity as _DimensionGroupIdentity,
    SeriesIdentity as _SeriesIdentity,
    AnomalyAlertingConfigurationPatch as _AnomalyAlertingConfigurationPatch,
    AnomalyDetectionConfigurationPatch as _AnomalyDetectionConfigurationPatch
)

if TYPE_CHECKING:
    from . import (
        Severity,
        SnoozeScope,
        AnomalyDetectorDirection,
        DataFeedGranularityType
    )
    from .._generated.models import (
        AnomalyResult,
        IncidentResult,
        RootCause,
    )


[docs]class MetricAnomalyAlertScopeType(str, Enum): """Anomaly scope """ WHOLE_SERIES = "WholeSeries" SERIES_GROUP = "SeriesGroup" TOP_N = "TopN" @classmethod def _to_generated(cls, alert): try: alert = alert.value except AttributeError: pass if alert == "WholeSeries": return "All" if alert == "SeriesGroup": return "Dimension" return alert @classmethod def _from_generated(cls, alert): try: alert = alert.value except AttributeError: pass if alert == "All": return "WholeSeries" if alert == "Dimension": return "SeriesGroup" return alert
[docs]class DataFeedRollupType(str, Enum): """Data feed rollup type """ NO_ROLLUP = "NoRollup" AUTO_ROLLUP = "AutoRollup" ALREADY_ROLLUP = "AlreadyRollup" @classmethod def _to_generated(cls, rollup): try: rollup = rollup.value except AttributeError: pass if rollup == "AutoRollup": return "NeedRollup" return rollup @classmethod def _from_generated(cls, rollup): try: rollup = rollup.value except AttributeError: pass if rollup == "NeedRollup": return "AutoRollup" return rollup
[docs]class MetricAnomalyAlertConfigurationsOperator(str, Enum): """Cross metrics operator """ AND = "AND" OR = "OR" XOR = "XOR"
[docs]class DetectionConditionsOperator(str, Enum): AND = "AND" OR = "OR"
[docs]class DataFeedGranularity(object): """Data feed granularity :param granularity_type: Granularity of the time series. Possible values include: "Yearly", "Monthly", "Weekly", "Daily", "Hourly", "Minutely", "Secondly", "Custom". :type granularity_type: str or ~azure.ai.metricsadvisor.models.DataFeedGranularityType :keyword int custom_granularity_value: Must be populated if granularity_type is "Custom". """ def __init__(self, granularity_type, **kwargs): # type: (Union[str, DataFeedGranularityType], Any) -> None self.granularity_type = granularity_type self.custom_granularity_value = kwargs.get('custom_granularity_value', None) @classmethod def _from_generated(cls, granularity_name, granularity_amount): return cls( granularity_type=granularity_name, custom_granularity_value=granularity_amount )
[docs]class DataFeedIngestionSettings(object): """Data feed ingestion settings. :param ~datetime.datetime ingestion_begin_time: Ingestion start time. :keyword int data_source_request_concurrency: The max concurrency of data ingestion queries against user data source. Zero (0) means no limitation. :keyword int ingestion_retry_delay: The min retry interval for failed data ingestion tasks, in seconds. :keyword int ingestion_start_offset: The time that the beginning of data ingestion task will delay for every data slice according to this offset, in seconds. :keyword int stop_retry_after: Stop retry data ingestion after the data slice first schedule time in seconds. """ def __init__(self, ingestion_begin_time, **kwargs): # type: (datetime.datetime, Any) -> None self.ingestion_begin_time = ingestion_begin_time self.ingestion_start_offset = kwargs.get('ingestion_start_offset', 0) self.data_source_request_concurrency = kwargs.get('data_source_request_concurrency', -1) self.ingestion_retry_delay = kwargs.get('ingestion_retry_delay', -1) self.stop_retry_after = kwargs.get('stop_retry_after', -1)
[docs]class DataFeedOptions(object): """Data feed options. :keyword list[str] admins: Data feed administrators. :keyword str data_feed_description: Data feed description. :keyword missing_data_point_fill_settings: The fill missing point type and value. :paramtype missing_data_point_fill_settings: ~azure.ai.metricsadvisor.models.DataFeedMissingDataPointFillSettings :keyword rollup_settings: The rollup settings. :paramtype rollup_settings: ~azure.ai.metricsadvisor.models.DataFeedRollupSettings :keyword list[str] viewers: Data feed viewer. :keyword access_mode: Data feed access mode. Possible values include: "Private", "Public". Default value: "Private". :paramtype access_mode: str or ~azure.ai.metricsadvisor.models.DataFeedAccessMode :keyword str action_link_template: action link for alert. """ def __init__(self, **kwargs): self.admins = kwargs.get('admins', None) self.data_feed_description = kwargs.get('data_feed_description', None) self.missing_data_point_fill_settings = kwargs.get('missing_data_point_fill_settings', None) self.rollup_settings = kwargs.get('rollup_settings', None) self.viewers = kwargs.get('viewers', None) self.access_mode = kwargs.get('access_mode', "Private") self.action_link_template = kwargs.get('action_link_template', None)
[docs]class DataFeedMissingDataPointFillSettings(object): """Data feed missing data point fill settings :keyword fill_type: The type of fill missing point for anomaly detection. Possible values include: "SmartFilling", "PreviousValue", "CustomValue", "NoFilling". Default value: "SmartFilling". :paramtype fill_type: str or ~azure.ai.metricsadvisor.models.DataSourceMissingDataPointFillType :keyword float custom_fill_value: The value of fill missing point for anomaly detection if "CustomValue" fill type is specified. """ def __init__(self, **kwargs): self.fill_type = kwargs.get('fill_type', "SmartFilling") self.custom_fill_value = kwargs.get('custom_fill_value', None)
[docs]class DataFeedRollupSettings(object): """Data feed rollup settings :keyword str rollup_identification_value: The identification value for the row of calculated all-up value. :keyword rollup_type: Mark if the data feed needs rollup. Possible values include: "NoRollup", "AutoRollup", "AlreadyRollup". Default value: "AutoRollup". :paramtype rollup_type: str or ~azure.ai.metricsadvisor.models.DataFeedRollupType :keyword list[str] auto_rollup_group_by_column_names: Roll up columns. :keyword rollup_method: Roll up method. Possible values include: "None", "Sum", "Max", "Min", "Avg", "Count". :paramtype rollup_method: str or ~azure.ai.metricsadvisor.models.DataFeedAutoRollupMethod """ def __init__(self, **kwargs): self.rollup_identification_value = kwargs.get('rollup_identification_value', None) self.rollup_type = kwargs.get('rollup_type', "AutoRollup") self.auto_rollup_group_by_column_names = kwargs.get('auto_rollup_group_by_column_names', None) self.rollup_method = kwargs.get('rollup_method', None)
[docs]class DataFeedSchema(object): """Data feed schema :param metrics: List of metrics. :type metrics: list[~azure.ai.metricsadvisor.models.Metric] :keyword dimensions: List of dimension. :paramtype dimensions: list[~azure.ai.metricsadvisor.models.Dimension] :keyword str timestamp_column: User-defined timestamp column. If timestamp_column is None, start time of every time slice will be used as default value. """ def __init__(self, metrics, **kwargs): # type: (List[Metric], Any) -> None self.metrics = metrics self.dimensions = kwargs.get('dimensions', None) self.timestamp_column = kwargs.get('timestamp_column', None)
[docs]class DataFeed(object): # pylint:disable=too-many-instance-attributes """Represents a data feed. :ivar ~datetime.datetime created_time: Data feed created time. :ivar granularity: Granularity of the time series. :vartype granularity: ~azure.ai.metricsadvisor.models.DataFeedGranularity :ivar str id: Data feed unique id. :ivar ingestion_settings: Data feed ingestion settings. :vartype ingestion_settings: ~azure.ai.metricsadvisor.models.DataFeedIngestionSettings :ivar bool is_admin: Whether the query user is one of data feed administrators or not. :ivar list[str] metric_ids: List of metric ids :ivar str name: Data feed name. :ivar options: Data feed options :vartype options: ~azure.ai.metricsadvisor.models.DataFeedOptions :ivar schema: Data feed schema :vartype schema: ~azure.ai.metricsadvisor.models.DataFeedSchema :ivar source: Data feed source. :vartype source: Union[AzureApplicationInsightsDataFeed, AzureBlobDataFeed, AzureCosmosDBDataFeed, AzureDataExplorerDataFeed, AzureDataLakeStorageGen2DataFeed, AzureTableDataFeed, HttpRequestDataFeed, InfluxDBDataFeed, MySqlDataFeed, PostgreSqlDataFeed, SQLServerDataFeed, MongoDBDataFeed, ElasticsearchDataFeed] :ivar status: Data feed status. Possible values include: "Active", "Paused". Default value: "Active". :vartype status: str or ~azure.ai.metricsadvisor.models.DataFeedStatus """ def __init__(self, **kwargs): self.created_time = kwargs.get('created_time', None) self.granularity = kwargs.get('granularity', None) self.id = kwargs.get('id', None) self.ingestion_settings = kwargs.get('ingestion_settings', None) self.is_admin = kwargs.get('is_admin', None) self.metric_ids = kwargs.get('metric_ids', None) self.name = kwargs.get('name', None) self.options = kwargs.get('options', None) self.schema = kwargs.get('schema', None) self.source = kwargs.get('source', None) self.status = kwargs.get('status', None) @classmethod def _from_generated(cls, data_feed): return cls( created_time=data_feed.created_time, granularity=DataFeedGranularity._from_generated(data_feed.granularity_name, data_feed.granularity_amount), id=data_feed.data_feed_id, ingestion_settings=DataFeedIngestionSettings( ingestion_begin_time=data_feed.data_start_from, data_source_request_concurrency=data_feed.max_concurrency, ingestion_retry_delay=data_feed.min_retry_interval_in_seconds, ingestion_start_offset=data_feed.start_offset_in_seconds, stop_retry_after=data_feed.stop_retry_after_in_seconds ), is_admin=data_feed.is_admin, metric_ids=[metric.metric_id for metric in data_feed.metrics], name=data_feed.data_feed_name, options=DataFeedOptions( admins=data_feed.admins, data_feed_description=data_feed.data_feed_description, missing_data_point_fill_settings=DataFeedMissingDataPointFillSettings( fill_type=data_feed.fill_missing_point_type, custom_fill_value=data_feed.fill_missing_point_value ), rollup_settings=DataFeedRollupSettings( rollup_identification_value=data_feed.all_up_identification, rollup_type=DataFeedRollupType._from_generated(data_feed.need_rollup), auto_rollup_group_by_column_names=data_feed.roll_up_columns, rollup_method=data_feed.roll_up_method ), viewers=data_feed.viewers, access_mode=data_feed.view_mode, action_link_template=data_feed.action_link_template ), schema=DataFeedSchema( dimensions=[Dimension._from_generated(dim) for dim in data_feed.dimension], metrics=[Metric._from_generated(metric) for metric in data_feed.metrics], timestamp_column=data_feed.timestamp_column ), source=DATA_FEED_TRANSFORM[data_feed.data_source_type]._from_generated(data_feed.data_source_parameter), status=data_feed.status, ) def _to_generated_patch(self, data_source_feed_type, kwargs): source_param = kwargs.pop("dataSourceParameter", None) if source_param: source_param = source_param._to_generated_patch() rollup_type = kwargs.pop("needRollup", None) if rollup_type: DataFeedRollupType._to_generated(rollup_type) return data_source_feed_type( data_feed_name=kwargs.pop("dataFeedName", None) or self.name, data_source_parameter=source_param if source_param else self.source._to_generated_patch(), timestamp_column=kwargs.pop("timestampColumn", None) or self.schema.timestamp_column, data_start_from=kwargs.pop("dataStartFrom", None) or self.ingestion_settings.ingestion_begin_time, max_concurrency=kwargs.pop("maxConcurrency", None) or self.ingestion_settings.data_source_request_concurrency, min_retry_interval_in_seconds=kwargs.pop("minRetryIntervalInSeconds", None) or self.ingestion_settings.ingestion_retry_delay, start_offset_in_seconds=kwargs.pop("startOffsetInSeconds", None) or self.ingestion_settings.ingestion_start_offset, stop_retry_after_in_seconds=kwargs.pop("stopRetryAfterInSeconds", None) or self.ingestion_settings.stop_retry_after, data_feed_description=kwargs.pop("dataFeedDescription", None) or self.options.data_feed_description if self.options else None, need_rollup=rollup_type or DataFeedRollupType._to_generated(self.options.rollup_settings.rollup_type) if self.options and self.options.rollup_settings else None, roll_up_method=kwargs.pop("rollUpMethod", None) or self.options.rollup_settings.rollup_method if self.options and self.options.rollup_settings else None, roll_up_columns=kwargs.pop("rollUpColumns", None) or self.options.rollup_settings.auto_rollup_group_by_column_names if self.options and self.options.rollup_settings else None, all_up_identification=kwargs.pop("allUpIdentification", None) or self.options.rollup_settings.rollup_identification_value if self.options and self.options.rollup_settings else None, fill_missing_point_type=kwargs.pop("fillMissingPointType", None) or self.options.missing_data_point_fill_settings.fill_type if self.options and self.options.missing_data_point_fill_settings else None, fill_missing_point_value=kwargs.pop("fillMissingPointValue", None) or self.options.missing_data_point_fill_settings.custom_fill_value if self.options and self.options.missing_data_point_fill_settings else None, viewers=kwargs.pop("viewers", None) or self.options.viewers if self.options else None, view_mode=kwargs.pop("viewMode", None) or self.options.access_mode if self.options else None, admins=kwargs.pop("admins", None) or self.options.admins if self.options else None, status=kwargs.pop("status", None) or self.status, action_link_template=kwargs.pop("actionLinkTemplate", None) or self.options.action_link_template if self.options else None )
[docs]class MetricAnomalyAlertScope(object): """MetricAnomalyAlertScope :param scope_type: Required. Anomaly scope. Possible values include: "WholeSeries", "SeriesGroup", "TopN". :type scope_type: str or ~azure.ai.metricsadvisor.models.MetricAnomalyAlertScopeType :keyword series_group_in_scope: Dimension specified for series group. :paramtype series_group_in_scope: dict[str, str] :keyword top_n_group_in_scope: :paramtype top_n_group_in_scope: ~azure.ai.metricsadvisor.models.TopNGroupScope """ def __init__(self, scope_type, **kwargs): # type: (Union[str, MetricAnomalyAlertScopeType], Any) -> None self.scope_type = scope_type self.series_group_in_scope = kwargs.get("series_group_in_scope", None) self.top_n_group_in_scope = kwargs.get("top_n_group_in_scope", None) @classmethod def _from_generated(cls, config): return cls( scope_type=MetricAnomalyAlertScopeType._from_generated(config.anomaly_scope_type), series_group_in_scope=config.dimension_anomaly_scope.dimension if config.dimension_anomaly_scope else None, top_n_group_in_scope=TopNGroupScope( top=config.top_n_anomaly_scope.top, period=config.top_n_anomaly_scope.period, min_top_count=config.top_n_anomaly_scope.min_top_count ) if config.top_n_anomaly_scope else None )
[docs]class TopNGroupScope(object): """TopNGroupScope. :param top: Required. top N, value range : [1, +∞). :type top: int :param period: Required. point count used to look back, value range : [1, +∞). :type period: int :param min_top_count: Required. min count should be in top N, value range : [1, +∞) should be less than or equal to period. :type min_top_count: int """ def __init__(self, top, period, min_top_count, **kwargs): # pylint: disable=unused-argument # type: (int, int, int, Any) -> None self.top = top self.period = period self.min_top_count = min_top_count
[docs]class SeverityCondition(object): """SeverityCondition. :param min_alert_severity: Required. min alert severity. Possible values include: "Low", "Medium", "High". :type min_alert_severity: str or ~azure.ai.metricsadvisor.models.Severity :param max_alert_severity: Required. max alert severity. Possible values include: "Low", "Medium", "High". :type max_alert_severity: str or ~azure.ai.metricsadvisor.models.Severity """ def __init__(self, min_alert_severity, max_alert_severity, **kwargs): # pylint: disable=unused-argument # type: (Union[str, Severity], Union[str, Severity], Any) -> None self.min_alert_severity = min_alert_severity self.max_alert_severity = max_alert_severity
[docs]class MetricAnomalyAlertSnoozeCondition(object): """MetricAnomalyAlertSnoozeCondition. :param auto_snooze: Required. snooze point count, value range : [0, +∞). :type auto_snooze: int :param snooze_scope: Required. snooze scope. Possible values include: "Metric", "Series". :type snooze_scope: str or ~azure.ai.metricsadvisor.models.SnoozeScope :param only_for_successive: Required. only snooze for successive anomalies. :type only_for_successive: bool """ def __init__(self, auto_snooze, snooze_scope, only_for_successive, **kwargs): # pylint: disable=unused-argument # type: (int, Union[str, SnoozeScope], bool, Any) -> None self.auto_snooze = auto_snooze self.snooze_scope = snooze_scope self.only_for_successive = only_for_successive
[docs]class MetricAnomalyAlertConditions(object): """MetricAnomalyAlertConditions :keyword metric_boundary_condition: :paramtype metric_boundary_condition: ~azure.ai.metricsadvisor.models.MetricBoundaryCondition :keyword severity_condition: :paramtype severity_condition: ~azure.ai.metricsadvisor.models.SeverityCondition """ def __init__(self, **kwargs): self.metric_boundary_condition = kwargs.get("metric_boundary_condition", None) self.severity_condition = kwargs.get("severity_condition", None)
[docs]class MetricBoundaryCondition(object): """MetricBoundaryCondition. :param direction: Required. value filter direction. Possible values include: "Both", "Down", "Up". :type direction: str or ~azure.ai.metricsadvisor.models.AnomalyDetectorDirection :keyword float lower: lower bound should be specified when direction is Both or Down. :keyword float upper: upper bound should be specified when direction is Both or Up. :keyword str companion_metric_id: the other metric unique id used for value filter. :keyword bool trigger_for_missing: trigger alert when the corresponding point is missing in the other metric should be specified only when using other metric to filter. """ def __init__(self, direction, **kwargs): # type: (Union[str, AnomalyDetectorDirection], Any) -> None self.direction = direction self.lower = kwargs.get('lower', None) self.upper = kwargs.get('upper', None) self.companion_metric_id = kwargs.get('companion_metric_id', None) self.trigger_for_missing = kwargs.get('trigger_for_missing', None)
[docs]class MetricAlertConfiguration(object): """MetricAlertConfiguration. :param detection_configuration_id: Required. Anomaly detection configuration unique id. :type detection_configuration_id: str :param alert_scope: Required. Anomaly scope. :type alert_scope: ~azure.ai.metricsadvisor.models.MetricAnomalyAlertScope :keyword negation_operation: Negation operation. :paramtype negation_operation: bool :keyword alert_conditions: :paramtype alert_conditions: ~azure.ai.metricsadvisor.models.MetricAnomalyAlertConditions :keyword alert_snooze_condition: :paramtype alert_snooze_condition: ~azure.ai.metricsadvisor.models.MetricAnomalyAlertSnoozeCondition """ def __init__(self, detection_configuration_id, alert_scope, **kwargs): # type: (str, MetricAnomalyAlertScope, Any) -> None self.detection_configuration_id = detection_configuration_id self.alert_scope = alert_scope self.negation_operation = kwargs.get("negation_operation", None) self.alert_conditions = kwargs.get("alert_conditions", None) self.alert_snooze_condition = kwargs.get("alert_snooze_condition", None) @classmethod def _from_generated(cls, config): return cls( detection_configuration_id=config.anomaly_detection_configuration_id, alert_scope=MetricAnomalyAlertScope._from_generated(config), negation_operation=config.negation_operation, alert_snooze_condition=MetricAnomalyAlertSnoozeCondition( auto_snooze=config.snooze_filter.auto_snooze, snooze_scope=config.snooze_filter.snooze_scope, only_for_successive=config.snooze_filter.only_for_successive ) if config.snooze_filter else None, alert_conditions=MetricAnomalyAlertConditions( metric_boundary_condition=MetricBoundaryCondition( direction=config.value_filter.direction, lower=config.value_filter.lower, upper=config.value_filter.upper, companion_metric_id=config.value_filter.metric_id, trigger_for_missing=config.value_filter.trigger_for_missing, ) if config.value_filter else None, severity_condition=SeverityCondition( max_alert_severity=config.severity_filter.max_alert_severity, min_alert_severity=config.severity_filter.min_alert_severity ) if config.severity_filter else None, ) ) def _to_generated(self): return _MetricAlertingConfiguration( anomaly_detection_configuration_id=self.detection_configuration_id, anomaly_scope_type=MetricAnomalyAlertScopeType._to_generated(self.alert_scope.scope_type), dimension_anomaly_scope=_DimensionGroupIdentity(dimension=self.alert_scope.series_group_in_scope) if self.alert_scope.series_group_in_scope else None, top_n_anomaly_scope=_TopNGroupScope( top=self.alert_scope.top_n_group_in_scope.top, period=self.alert_scope.top_n_group_in_scope.period, min_top_count=self.alert_scope.top_n_group_in_scope.min_top_count, ) if self.alert_scope.top_n_group_in_scope else None, negation_operation=self.negation_operation, severity_filter=_SeverityCondition( max_alert_severity=self.alert_conditions.severity_condition.max_alert_severity, min_alert_severity=self.alert_conditions.severity_condition.min_alert_severity ) if self.alert_conditions and self.alert_conditions.severity_condition else None, snooze_filter=_AlertSnoozeCondition( auto_snooze=self.alert_snooze_condition.auto_snooze, snooze_scope=self.alert_snooze_condition.snooze_scope, only_for_successive=self.alert_snooze_condition.only_for_successive ) if self.alert_snooze_condition else None, value_filter=_ValueCondition( direction=self.alert_conditions.metric_boundary_condition.direction, lower=self.alert_conditions.metric_boundary_condition.lower, upper=self.alert_conditions.metric_boundary_condition.upper, metric_id=self.alert_conditions.metric_boundary_condition.companion_metric_id, trigger_for_missing=self.alert_conditions.metric_boundary_condition.trigger_for_missing, ) if self.alert_conditions and self.alert_conditions.metric_boundary_condition else None )
[docs]class AnomalyAlertConfiguration(object): """AnomalyAlertConfiguration. :ivar id: anomaly alert configuration unique id. :vartype id: str :ivar name: Required. anomaly alert configuration name. :vartype name: str :ivar description: anomaly alert configuration description. :vartype description: str :ivar cross_metrics_operator: cross metrics operator should be specified when setting up multiple metric alert configurations. Possible values include: "AND", "OR", "XOR". :vartype cross_metrics_operator: str or ~azure.ai.metricsadvisor.models.MetricAnomalyAlertConfigurationsOperator :ivar hook_ids: Required. hook unique ids. :vartype hook_ids: list[str] :ivar metric_alert_configurations: Required. Anomaly alert configurations. :vartype metric_alert_configurations: list[~azure.ai.metricsadvisor.models.MetricAlertConfiguration] """ def __init__(self, **kwargs): self.id = kwargs.get('id', None) self.name = kwargs.get('name', None) self.description = kwargs.get('description', None) self.cross_metrics_operator = kwargs.get('cross_metrics_operator', None) self.hook_ids = kwargs.get('hook_ids', None) self.metric_alert_configurations = kwargs.get('metric_alert_configurations', None) @classmethod def _from_generated(cls, config): return cls( id=config.anomaly_alerting_configuration_id, name=config.name, description=config.description, cross_metrics_operator=config.cross_metrics_operator, hook_ids=config.hook_ids, metric_alert_configurations=[ MetricAlertConfiguration._from_generated(c) for c in config.metric_alerting_configurations ] ) def _to_generated_patch( self, name, metric_alert_configurations, hook_ids, cross_metrics_operator, description ): metric_alert_configurations = metric_alert_configurations or self.metric_alert_configurations return _AnomalyAlertingConfigurationPatch( name=name or self.name, metric_alerting_configurations=[ config._to_generated() for config in metric_alert_configurations ] if metric_alert_configurations else None, hook_ids=hook_ids or self.hook_ids, cross_metrics_operator=cross_metrics_operator or self.cross_metrics_operator, description=description or self.description )
[docs]class AnomalyDetectionConfiguration(object): """AnomalyDetectionConfiguration. :ivar str id: anomaly detection configuration unique id. :ivar str name: Required. anomaly detection configuration name. :ivar str description: anomaly detection configuration description. :ivar str metric_id: Required. metric unique id. :ivar whole_series_detection_condition: Required. Conditions to detect anomalies in all time series of a metric. :vartype whole_series_detection_condition: ~azure.ai.metricsadvisor.models.MetricDetectionCondition :ivar series_group_detection_conditions: detection configuration for series group. :vartype series_group_detection_conditions: list[~azure.ai.metricsadvisor.models.MetricSeriesGroupDetectionCondition] :ivar series_detection_conditions: detection configuration for specific series. :vartype series_detection_conditions: list[~azure.ai.metricsadvisor.models.MetricSingleSeriesDetectionCondition] """ def __init__(self, **kwargs): self.id = kwargs.get('id', None) self.name = kwargs.get('name', None) self.description = kwargs.get('description', None) self.metric_id = kwargs.get('metric_id', None) self.whole_series_detection_condition = kwargs.get('whole_series_detection_condition', None) self.series_group_detection_conditions = kwargs.get('series_group_detection_conditions', None) self.series_detection_conditions = kwargs.get('series_detection_conditions', None) @classmethod def _from_generated(cls, config): return cls( id=config.anomaly_detection_configuration_id, name=config.name, description=config.description, metric_id=config.metric_id, whole_series_detection_condition=MetricDetectionCondition._from_generated( config.whole_metric_configuration ), series_group_detection_conditions=[ MetricSeriesGroupDetectionCondition._from_generated(conf) for conf in config.dimension_group_override_configurations] if config.dimension_group_override_configurations else None, series_detection_conditions=[ MetricSingleSeriesDetectionCondition._from_generated(conf) for conf in config.series_override_configurations] if config.series_override_configurations else None, ) def _to_generated_patch( self, name, description, whole_series_detection_condition, series_group_detection_conditions, series_detection_conditions ): whole_series_detection_condition = whole_series_detection_condition or self.whole_series_detection_condition series_group = series_group_detection_conditions or self.series_group_detection_conditions series_detection = series_detection_conditions or self.series_detection_conditions return _AnomalyDetectionConfigurationPatch( name=name or self.name, description=description or self.description, whole_metric_configuration=whole_series_detection_condition._to_generated() if whole_series_detection_condition else None, dimension_group_override_configurations=[group._to_generated() for group in series_group] if series_group else None, series_override_configurations=[series._to_generated() for series in series_detection] if series_detection else None )
[docs]class AzureApplicationInsightsDataFeed(object): """AzureApplicationInsightsDataFeed. :param azure_cloud: Required. Azure cloud environment. :type azure_cloud: str :param application_id: Required. Azure Application Insights ID. :type application_id: str :param api_key: Required. API Key. :type api_key: str :param query: Required. Query. :type query: str """ def __init__(self, azure_cloud, application_id, api_key, query, **kwargs): # pylint: disable=unused-argument # type: (str, str, str, str, Any) -> None self.data_source_type = 'AzureApplicationInsights' # type: str self.azure_cloud = azure_cloud self.application_id = application_id self.api_key = api_key self.query = query @classmethod def _from_generated(cls, source): return cls( azure_cloud=source.azure_cloud, application_id=source.application_id, api_key=source.api_key, query=source.query ) def _to_generated_patch(self): return _AzureApplicationInsightsParameter( azure_cloud=self.azure_cloud, application_id=self.application_id, api_key=self.api_key, query=self.query )
[docs]class AzureBlobDataFeed(object): """AzureBlobDataFeed. :param connection_string: Required. Azure Blob connection string. :type connection_string: str :param container: Required. Container. :type container: str :param blob_template: Required. Blob Template. :type blob_template: str """ def __init__(self, connection_string, container, blob_template, **kwargs): # pylint: disable=unused-argument # type: (str, str, str, Any) -> None self.data_source_type = 'AzureBlob' # type: str self.connection_string = connection_string self.container = container self.blob_template = blob_template @classmethod def _from_generated(cls, source): return cls( connection_string=source.connection_string, container=source.container, blob_template=source.blob_template ) def _to_generated_patch(self): return _AzureBlobParameter( connection_string=self.connection_string, container=self.container, blob_template=self.blob_template )
[docs]class AzureCosmosDBDataFeed(object): """AzureCosmosDBDataFeed. :param connection_string: Required. Azure CosmosDB connection string. :type connection_string: str :param sql_query: Required. Query script. :type sql_query: str :param database: Required. Database name. :type database: str :param collection_id: Required. Collection id. :type collection_id: str """ def __init__( self, connection_string, sql_query, database, collection_id, **kwargs ): # pylint: disable=unused-argument # type: (str, str, str, str, Any) -> None self.data_source_type = 'AzureCosmosDB' # type: str self.connection_string = connection_string self.sql_query = sql_query self.database = database self.collection_id = collection_id @classmethod def _from_generated(cls, source): return cls( connection_string=source.connection_string, sql_query=source.sql_query, database=source.database, collection_id=source.collection_id ) def _to_generated_patch(self): return _AzureCosmosDBParameter( connection_string=self.connection_string, sql_query=self.sql_query, database=self.database, collection_id=self.collection_id )
[docs]class AzureDataExplorerDataFeed(object): """AzureDataExplorerDataFeed. :param connection_string: Required. Database connection string. :type connection_string: str :param query: Required. Query script. :type query: str """ def __init__(self, connection_string, query, **kwargs): # pylint: disable=unused-argument # type: (str, str, Any) -> None self.data_source_type = 'AzureDataExplorer' # type: str self.connection_string = connection_string self.query = query @classmethod def _from_generated(cls, source): return cls( connection_string=source.connection_string, query=source.query, ) def _to_generated_patch(self): return _SqlSourceParameter( connection_string=self.connection_string, query=self.query, )
[docs]class AzureTableDataFeed(object): """AzureTableDataFeed. :param connection_string: Required. Azure Table connection string. :type connection_string: str :param query: Required. Query script. :type query: str :param table: Required. Table name. :type table: str """ def __init__(self, connection_string, query, table, **kwargs): # pylint: disable=unused-argument # type: (str, str, str, Any) -> None self.data_source_type = 'AzureTable' # type: str self.connection_string = connection_string self.query = query self.table = table @classmethod def _from_generated(cls, source): return cls( connection_string=source.connection_string, query=source.query, table=source.table ) def _to_generated_patch(self): return _AzureTableParameter( connection_string=self.connection_string, query=self.query, table=self.table )
[docs]class HttpRequestDataFeed(object): """HttpRequestDataFeed. :param url: Required. HTTP URL. :type url: str :param http_method: Required. HTTP method. :type http_method: str :keyword str http_header: Required. HTTP header. :keyword str payload: Required. HTTP request body. """ def __init__(self, url, http_method, **kwargs): # type: (str, str, Any) -> None self.data_source_type = 'HttpRequest' # type: str self.url = url self.http_method = http_method self.http_header = kwargs.get("http_header", None) self.payload = kwargs.get("payload", None) @classmethod def _from_generated(cls, source): return cls( url=source.url, http_header=source.http_header, http_method=source.http_method, payload=source.payload ) def _to_generated_patch(self): return _HttpRequestParameter( url=self.url, http_header=self.http_header, http_method=self.http_method, payload=self.payload )
[docs]class InfluxDBDataFeed(object): """InfluxDBDataFeed. :param connection_string: Required. InfluxDB connection string. :type connection_string: str :param database: Required. Database name. :type database: str :param user_name: Required. Database access user. :type user_name: str :param password: Required. Database access password. :type password: str :param query: Required. Query script. :type query: str """ def __init__( self, connection_string, database, user_name, password, query, **kwargs ): # pylint: disable=unused-argument # type: (str, str, str, str, str, Any) -> None self.data_source_type = 'InfluxDB' # type: str self.connection_string = connection_string self.database = database self.user_name = user_name self.password = password self.query = query @classmethod def _from_generated(cls, source): return cls( connection_string=source.connection_string, database=source.database, user_name=source.user_name, password=source.password, query=source.query ) def _to_generated_patch(self): return _InfluxDBParameter( connection_string=self.connection_string, database=self.database, user_name=self.user_name, password=self.password, query=self.query )
[docs]class MySqlDataFeed(object): """MySqlDataFeed. :param connection_string: Required. Database connection string. :type connection_string: str :param query: Required. Query script. :type query: str """ def __init__(self, connection_string, query, **kwargs): # pylint: disable=unused-argument # type: (str, str, Any) -> None self.data_source_type = 'MySql' # type: str self.connection_string = connection_string self.query = query @classmethod def _from_generated(cls, source): return cls( connection_string=source.connection_string, query=source.query, ) def _to_generated_patch(self): return _SqlSourceParameter( connection_string=self.connection_string, query=self.query )
[docs]class PostgreSqlDataFeed(object): """PostgreSqlDataFeed. :param connection_string: Required. Database connection string. :type connection_string: str :param query: Required. Query script. :type query: str """ def __init__(self, connection_string, query, **kwargs): # pylint: disable=unused-argument # type: (str, str, Any) -> None self.data_source_type = 'PostgreSql' # type: str self.connection_string = connection_string self.query = query @classmethod def _from_generated(cls, source): return cls( connection_string=source.connection_string, query=source.query, ) def _to_generated_patch(self): return _SqlSourceParameter( connection_string=self.connection_string, query=self.query )
[docs]class SQLServerDataFeed(object): """SQLServerDataFeed. :param connection_string: Required. Database connection string. :type connection_string: str :param query: Required. Query script. :type query: str """ def __init__(self, connection_string, query, **kwargs): # pylint: disable=unused-argument # type: (str, str, Any) -> None self.data_source_type = 'SqlServer' # type: str self.connection_string = connection_string self.query = query @classmethod def _from_generated(cls, source): return cls( connection_string=source.connection_string, query=source.query, ) def _to_generated_patch(self): return _SqlSourceParameter( connection_string=self.connection_string, query=self.query, )
[docs]class AzureDataLakeStorageGen2DataFeed(object): """AzureDataLakeStorageGen2Parameter. :param account_name: Required. Account name. :type account_name: str :param account_key: Required. Account key. :type account_key: str :param file_system_name: Required. File system name (Container). :type file_system_name: str :param directory_template: Required. Directory template. :type directory_template: str :param file_template: Required. File template. :type file_template: str """ def __init__( self, account_name, account_key, file_system_name, directory_template, file_template, **kwargs ): # pylint: disable=unused-argument # type: (str, str, str, str, str, Any) -> None self.data_source_type = 'AzureDataLakeStorageGen2' # type: str self.account_name = account_name self.account_key = account_key self.file_system_name = file_system_name self.directory_template = directory_template self.file_template = file_template @classmethod def _from_generated(cls, source): return cls( account_name=source.account_name, account_key=source.account_key, file_system_name=source.file_system_name, directory_template=source.directory_template, file_template=source.file_template ) def _to_generated_patch(self): return _AzureDataLakeStorageGen2Parameter( account_name=self.account_name, account_key=self.account_key, file_system_name=self.file_system_name, directory_template=self.directory_template, file_template=self.file_template )
[docs]class ElasticsearchDataFeed(object): """ElasticsearchParameter. :param host: Required. Host. :type host: str :param port: Required. Port. :type port: str :param auth_header: Required. Authorization header. :type auth_header: str :param query: Required. Query. :type query: str """ def __init__(self, host, port, auth_header, query, **kwargs): # pylint: disable=unused-argument # type: (str, str, str, str, Any) -> None self.data_source_type = 'Elasticsearch' # type: str self.host = host self.port = port self.auth_header = auth_header self.query = query @classmethod def _from_generated(cls, source): return cls( host=source.host, port=source.port, auth_header=source.auth_header, query=source.query ) def _to_generated_patch(self): return _ElasticsearchParameter( host=self.host, port=self.port, auth_header=self.auth_header, query=self.query )
[docs]class MongoDBDataFeed(object): """MongoDBDataFeed. :param connection_string: Required. MongoDB connection string. :type connection_string: str :param database: Required. Database name. :type database: str :param command: Required. Query script. :type command: str """ def __init__(self, connection_string, database, command, **kwargs): # pylint: disable=unused-argument # type: (str, str, str, Any) -> None self.data_source_type = 'MongoDB' # type: str self.connection_string = connection_string self.database = database self.command = command @classmethod def _from_generated(cls, source): return cls( connection_string=source.connection_string, database=source.database, command=source.command ) def _to_generated_patch(self): return _MongoDBParameter( connection_string=self.connection_string, database=self.database, command=self.command )
[docs]class Hook(object): """Hook. :ivar str description: Hook description. :ivar str external_link: Hook external link. :ivar list[str] admins: Hook administrators. :ivar str name: Hook unique name. :ivar str hook_type: Constant filled by server. Possible values include: "Webhook", "Email". :ivar str hook_id: Hook unique id. """ def __init__(self, **kwargs): self.id = kwargs.get('id', None) self.name = kwargs.get('name', None) self.description = kwargs.get('description', None) self.external_link = kwargs.get('external_link', None) self.admins = kwargs.get('admins', None) self.hook_type = None
[docs]class EmailHook(Hook): """EmailHook. :param list[str] emails_to_alert: Required. Email TO: list. :keyword str description: Hook description. :keyword str external_link: Hook external link. :ivar list[str] admins: Hook administrators. :ivar str name: Hook unique name. :ivar str hook_type: Constant filled by server - "Email". :ivar str id: Hook unique id. """ def __init__(self, emails_to_alert, **kwargs): # type: (List[str], Any) -> None super(EmailHook, self).__init__(**kwargs) self.hook_type = 'Email' # type: str self.emails_to_alert = emails_to_alert @classmethod def _from_generated(cls, hook): return cls( emails_to_alert=hook.hook_parameter.to_list, name=hook.hook_name, description=hook.description, external_link=hook.external_link, admins=hook.admins, id=hook.hook_id ) def _to_generated(self, name): return _EmailHookInfo( hook_name=name, description=self.description, external_link=self.external_link, admins=self.admins, hook_parameter=_EmailHookParameter( to_list=self.emails_to_alert ) ) def _to_generated_patch(self, name, description, external_link, emails_to_alert): return _EmailHookInfoPatch( hook_name=name or self.name, description=description or self.description, external_link=external_link or self.external_link, admins=self.admins, hook_parameter=_EmailHookParameter( to_list=emails_to_alert or self.emails_to_alert ) )
[docs]class WebHook(Hook): """WebHook. :param str endpoint: Required. API address, will be called when alert is triggered, only support POST method via SSL. :keyword str username: basic authentication. :keyword str password: basic authentication. :keyword str certificate_key: client certificate. :keyword str certificate_password: client certificate password. :keyword str description: Hook description. :keyword str external_link: Hook external link. :ivar list[str] admins: Hook administrators. :ivar str name: Hook unique name. :ivar str hook_type: Constant filled by server - "Webhook". :ivar str id: Hook unique id. """ def __init__(self, endpoint, **kwargs): # type: (str, Any) -> None super(WebHook, self).__init__(**kwargs) self.hook_type = 'Webhook' # type: str self.endpoint = endpoint self.username = kwargs.get('username', None) self.password = kwargs.get('password', None) self.certificate_key = kwargs.get('certificate_key', None) self.certificate_password = kwargs.get('certificate_password', None) @classmethod def _from_generated(cls, hook): return cls( endpoint=hook.hook_parameter.endpoint, username=hook.hook_parameter.username, password=hook.hook_parameter.password, certificate_key=hook.hook_parameter.certificate_key, certificate_password=hook.hook_parameter.certificate_password, name=hook.hook_name, description=hook.description, external_link=hook.external_link, admins=hook.admins, id=hook.hook_id ) def _to_generated(self, name): return _WebhookHookInfo( hook_name=name, description=self.description, external_link=self.external_link, admins=self.admins, hook_parameter=_WebhookHookParameter( endpoint=self.endpoint, username=self.username, password=self.password, certificate_key=self.certificate_key, certificate_password=self.certificate_password ) ) def _to_generated_patch( self, name, description, external_link, endpoint, password, username, certificate_key, certificate_password ): return _WebhookHookInfoPatch( hook_name=name or self.name, description=description or self.description, external_link=external_link or self.external_link, admins=self.admins, hook_parameter=_WebhookHookParameter( endpoint=endpoint or self.endpoint, username=username or self.username, password=password or self.password, certificate_key=certificate_key or self.certificate_key, certificate_password=certificate_password or self.certificate_password ) )
[docs]class MetricDetectionCondition(object): """MetricDetectionCondition. :keyword cross_conditions_operator: condition operator should be specified when combining multiple detection conditions. Possible values include: "AND", "OR". :paramtype cross_conditions_operator: str or ~azure.ai.metricsadvisor.models.DetectionConditionsOperator :keyword smart_detection_condition: :paramtype smart_detection_condition: ~azure.ai.metricsadvisor.models.SmartDetectionCondition :keyword hard_threshold_condition: :paramtype hard_threshold_condition: ~azure.ai.metricsadvisor.models.HardThresholdCondition :keyword change_threshold_condition: :paramtype change_threshold_condition: ~azure.ai.metricsadvisor.models.ChangeThresholdCondition """ def __init__(self, **kwargs): self.cross_conditions_operator = kwargs.get('cross_conditions_operator', None) self.smart_detection_condition = kwargs.get('smart_detection_condition', None) self.hard_threshold_condition = kwargs.get('hard_threshold_condition', None) self.change_threshold_condition = kwargs.get('change_threshold_condition', None) @classmethod def _from_generated(cls, condition): return cls( cross_conditions_operator=condition.condition_operator, smart_detection_condition=SmartDetectionCondition._from_generated(condition.smart_detection_condition), hard_threshold_condition=HardThresholdCondition._from_generated(condition.hard_threshold_condition), change_threshold_condition=ChangeThresholdCondition._from_generated(condition.change_threshold_condition) ) def _to_generated(self): return _WholeMetricConfiguration( condition_operator=self.cross_conditions_operator, smart_detection_condition=self.smart_detection_condition._to_generated() if self.smart_detection_condition else None, hard_threshold_condition=self.hard_threshold_condition._to_generated() if self.hard_threshold_condition else None, change_threshold_condition=self.change_threshold_condition._to_generated() if self.change_threshold_condition else None )
[docs]class ChangeThresholdCondition(object): """ChangeThresholdCondition. :param change_percentage: Required. change percentage, value range : [0, +∞). :type change_percentage: float :param shift_point: Required. shift point, value range : [1, +∞). :type shift_point: int :param within_range: Required. if the withinRange = true, detected data is abnormal when the value falls in the range, in this case anomalyDetectorDirection must be Both if the withinRange = false, detected data is abnormal when the value falls out of the range. :type within_range: bool :param anomaly_detector_direction: Required. detection direction. Possible values include: "Both", "Down", "Up". :type anomaly_detector_direction: str or ~azure.ai.metricsadvisor.models.AnomalyDetectorDirection :param suppress_condition: Required. :type suppress_condition: ~azure.ai.metricsadvisor.models.SuppressCondition """ def __init__( self, change_percentage, # type: float shift_point, # type: int within_range, # type: bool anomaly_detector_direction, # type: Union[str, AnomalyDetectorDirection] suppress_condition, # type: SuppressCondition **kwargs # type: Any ): # pylint: disable=unused-argument # type: (...) -> None self.change_percentage = change_percentage self.shift_point = shift_point self.within_range = within_range self.anomaly_detector_direction = anomaly_detector_direction self.suppress_condition = suppress_condition @classmethod def _from_generated(cls, condition): return cls( change_percentage=condition.change_percentage, shift_point=condition.shift_point, within_range=condition.within_range, anomaly_detector_direction=condition.anomaly_detector_direction, suppress_condition=SuppressCondition._from_generated(condition.suppress_condition), ) if condition else None def _to_generated(self): return _ChangeThresholdCondition( change_percentage=self.change_percentage, shift_point=self.shift_point, within_range=self.within_range, anomaly_detector_direction=self.anomaly_detector_direction, suppress_condition=_SuppressCondition( min_number=self.suppress_condition.min_number, min_ratio=self.suppress_condition.min_ratio, ) )
[docs]class SuppressCondition(object): """SuppressCondition. :param min_number: Required. min point number, value range : [1, +∞). :type min_number: int :param min_ratio: Required. min point ratio, value range : (0, 100]. :type min_ratio: float """ def __init__(self, min_number, min_ratio, **kwargs): # pylint: disable=unused-argument # type: (int, float, Any) -> None self.min_number = min_number self.min_ratio = min_ratio @classmethod def _from_generated(cls, condition): return cls( min_number=condition.min_number, min_ratio=condition.min_ratio ) if condition else None
[docs]class SmartDetectionCondition(object): """SmartDetectionCondition. :param sensitivity: Required. sensitivity, value range : (0, 100]. :type sensitivity: float :param anomaly_detector_direction: Required. detection direction. Possible values include: "Both", "Down", "Up". :type anomaly_detector_direction: str or ~azure.ai.metricsadvisor.models.AnomalyDetectorDirection :param suppress_condition: Required. :type suppress_condition: ~azure.ai.metricsadvisor.models.SuppressCondition """ def __init__( self, sensitivity, anomaly_detector_direction, suppress_condition, **kwargs ): # pylint: disable=unused-argument # type: (float, Union[str, AnomalyDetectorDirection], SuppressCondition, Any) -> None self.sensitivity = sensitivity self.anomaly_detector_direction = anomaly_detector_direction self.suppress_condition = suppress_condition @classmethod def _from_generated(cls, condition): return cls( sensitivity=condition.sensitivity, anomaly_detector_direction=condition.anomaly_detector_direction, suppress_condition=SuppressCondition._from_generated(condition.suppress_condition) ) if condition else None def _to_generated(self): return _SmartDetectionCondition( sensitivity=self.sensitivity, anomaly_detector_direction=self.anomaly_detector_direction, suppress_condition=_SuppressCondition( min_number=self.suppress_condition.min_number, min_ratio=self.suppress_condition.min_ratio, ) )
[docs]class HardThresholdCondition(object): """HardThresholdCondition. :param anomaly_detector_direction: Required. detection direction. Possible values include: "Both", "Down", "Up". :type anomaly_detector_direction: str or ~azure.ai.metricsadvisor.models.AnomalyDetectorDirection :param suppress_condition: Required. :type suppress_condition: ~azure.ai.metricsadvisor.models.SuppressCondition :keyword lower_bound: lower bound should be specified when anomalyDetectorDirection is Both or Down. :paramtype lower_bound: float :keyword upper_bound: upper bound should be specified when anomalyDetectorDirection is Both or Up. :paramtype upper_bound: float """ def __init__(self, anomaly_detector_direction, suppress_condition, **kwargs): # type: (Union[str, AnomalyDetectorDirection], SuppressCondition, Any) -> None self.anomaly_detector_direction = anomaly_detector_direction self.suppress_condition = suppress_condition self.lower_bound = kwargs.get('lower_bound', None) self.upper_bound = kwargs.get('upper_bound', None) @classmethod def _from_generated(cls, condition): return cls( anomaly_detector_direction=condition.anomaly_detector_direction, suppress_condition=SuppressCondition._from_generated(condition.suppress_condition), lower_bound=condition.lower_bound, upper_bound=condition.upper_bound ) if condition else None def _to_generated(self): return _HardThresholdCondition( lower_bound=self.lower_bound, upper_bound=self.upper_bound, anomaly_detector_direction=self.anomaly_detector_direction, suppress_condition=_SuppressCondition( min_number=self.suppress_condition.min_number, min_ratio=self.suppress_condition.min_ratio, ) )
[docs]class MetricSeriesGroupDetectionCondition(MetricDetectionCondition): """MetricSeriesGroupAnomalyDetectionConditions. :param series_group_key: Required. dimension specified for series group. :type series_group_key: dict[str, str] :keyword cross_conditions_operator: condition operator should be specified when combining multiple detection conditions. Possible values include: "AND", "OR". :paramtype cross_conditions_operator: str or ~azure.ai.metricsadvisor.models.DetectionConditionsOperator :keyword smart_detection_condition: :paramtype smart_detection_condition: ~azure.ai.metricsadvisor.models.SmartDetectionCondition :keyword hard_threshold_condition: :paramtype hard_threshold_condition: ~azure.ai.metricsadvisor.models.HardThresholdCondition :keyword change_threshold_condition: :paramtype change_threshold_condition: ~azure.ai.metricsadvisor.models.ChangeThresholdCondition """ def __init__(self, series_group_key, **kwargs): # type: (Dict[str, str], Any) -> None super(MetricSeriesGroupDetectionCondition, self).__init__(**kwargs) self.series_group_key = series_group_key @classmethod def _from_generated(cls, condition): return cls( series_group_key=condition.group.dimension, cross_conditions_operator=condition.condition_operator, smart_detection_condition=SmartDetectionCondition._from_generated(condition.smart_detection_condition), hard_threshold_condition=HardThresholdCondition._from_generated(condition.hard_threshold_condition), change_threshold_condition=ChangeThresholdCondition._from_generated(condition.change_threshold_condition) ) def _to_generated(self): return _DimensionGroupConfiguration( group=_DimensionGroupIdentity(dimension=self.series_group_key), condition_operator=self.cross_conditions_operator, smart_detection_condition=self.smart_detection_condition._to_generated() if self.smart_detection_condition else None, hard_threshold_condition=self.hard_threshold_condition._to_generated() if self.hard_threshold_condition else None, change_threshold_condition=self.change_threshold_condition._to_generated() if self.change_threshold_condition else None )
[docs]class MetricSingleSeriesDetectionCondition(MetricDetectionCondition): """MetricSingleSeriesDetectionCondition. :param series_key: Required. dimension specified for series. :type series_key: dict[str, str] :keyword cross_conditions_operator: condition operator should be specified when combining multiple detection conditions. Possible values include: "AND", "OR". :paramtype cross_conditions_operator: str or ~azure.ai.metricsadvisor.models.DetectionConditionsOperator :keyword smart_detection_condition: :paramtype smart_detection_condition: ~azure.ai.metricsadvisor.models.SmartDetectionCondition :keyword hard_threshold_condition: :paramtype hard_threshold_condition: ~azure.ai.metricsadvisor.models.HardThresholdCondition :keyword change_threshold_condition: :paramtype change_threshold_condition: ~azure.ai.metricsadvisor.models.ChangeThresholdCondition """ def __init__(self, series_key, **kwargs): # type: (Dict[str, str], Any) -> None super(MetricSingleSeriesDetectionCondition, self).__init__(**kwargs) self.series_key = series_key @classmethod def _from_generated(cls, condition): return cls( series_key=condition.series.dimension, cross_conditions_operator=condition.condition_operator, smart_detection_condition=SmartDetectionCondition._from_generated(condition.smart_detection_condition), hard_threshold_condition=HardThresholdCondition._from_generated(condition.hard_threshold_condition), change_threshold_condition=ChangeThresholdCondition._from_generated(condition.change_threshold_condition) ) def _to_generated(self): return _SeriesConfiguration( series=_SeriesIdentity(dimension=self.series_key), condition_operator=self.cross_conditions_operator, smart_detection_condition=self.smart_detection_condition._to_generated() if self.smart_detection_condition else None, hard_threshold_condition=self.hard_threshold_condition._to_generated() if self.hard_threshold_condition else None, change_threshold_condition=self.change_threshold_condition._to_generated() if self.change_threshold_condition else None )
[docs]class Metric(object): """Metric. :param name: Required. metric name. :type name: str :keyword display_name: metric display name. :paramtype display_name: str :keyword description: metric description. :paramtype description: str :ivar id: metric id. :vartype id: str """ def __init__(self, name, **kwargs): # type: (str, Any) -> None self.name = name self.id = kwargs.get('id', None) self.display_name = kwargs.get('display_name', None) self.description = kwargs.get('description', None) @classmethod def _from_generated(cls, metric): return cls( id=metric.metric_id, name=metric.metric_name, display_name=metric.metric_display_name, description=metric.metric_description ) def _to_generated(self): return _Metric( metric_name=self.name, metric_display_name=self.display_name, metric_description=self.description )
[docs]class Dimension(object): """Dimension. :param name: Required. dimension name. :type name: str :keyword display_name: dimension display name. :paramtype display_name: str """ def __init__(self, name, **kwargs): # type: (str, Any) -> None self.name = name self.display_name = kwargs.get('display_name', None) @classmethod def _from_generated(cls, dimension): return cls( name=dimension.dimension_name, display_name=dimension.dimension_display_name, ) def _to_generated(self): return _Dimension( dimension_name=self.name, dimension_display_name=self.display_name )
[docs]class DataFeedIngestionProgress(object): """DataFeedIngestionProgress. :ivar latest_success_timestamp: the timestamp of lastest success ingestion job. null indicates not available. :vartype latest_success_timestamp: ~datetime.datetime :ivar latest_active_timestamp: the timestamp of lastest ingestion job with status update. null indicates not available. :vartype latest_active_timestamp: ~datetime.datetime """ def __init__(self, **kwargs): self.latest_success_timestamp = kwargs.get("latest_success_timestamp") self.latest_active_timestamp = kwargs.get("latest_active_timestamp") @classmethod def _from_generated(cls, resp): return cls( latest_success_timestamp=resp.latest_success_timestamp, latest_active_timestamp=resp.latest_active_timestamp )
[docs]class MetricSeriesData(object): """MetricSeriesData. :ivar definition: :vartype definition: ~azure.ai.metricsadvisor.models.MetricSeriesDefinition :ivar timestamp_list: timestamps of the data related to this time series. :vartype timestamp_list: list[~datetime.datetime] :ivar value_list: values of the data related to this time series. :vartype value_list: list[float] """ def __init__(self, **kwargs): self.definition = kwargs.get('definition', None) self.timestamp_list = kwargs.get('timestamp_list', None) self.value_list = kwargs.get('value_list', None) @classmethod def _from_generated(cls, data): return cls( definition=data.id, timestamp_list=data.timestamp_list, value_list=data.value_list )
[docs]class Alert(object): """Alert :ivar id: alert id. :vartype id: str :ivar timestamp: anomaly time. :vartype timestamp: ~datetime.datetime :ivar created_on: created time. :vartype created_on: ~datetime.datetime :ivar modified_on: modified time. :vartype modified_on: ~datetime.datetime """ def __init__(self, **kwargs): self.id = kwargs.get('id', None) self.timestamp = kwargs.get('timestamp', None) self.created_on = kwargs.get('created_on', None) self.modified_on = kwargs.get('modified_on', None) @classmethod def _from_generated(cls, alert): return cls( id=alert.alert_id, timestamp=alert.timestamp, created_on=alert.created_time, modified_on=alert.modified_time )
DATA_FEED_TRANSFORM = { "SqlServer": SQLServerDataFeed, "AzureApplicationInsights": AzureApplicationInsightsDataFeed, "AzureBlob": AzureBlobDataFeed, "AzureCosmosDB": AzureCosmosDBDataFeed, "AzureDataExplorer": AzureDataExplorerDataFeed, "AzureTable": AzureTableDataFeed, "HttpRequest": HttpRequestDataFeed, "InfluxDB": InfluxDBDataFeed, "MySql": MySqlDataFeed, "PostgreSql": PostgreSqlDataFeed, "MongoDB": MongoDBDataFeed, "AzureDataLakeStorageGen2": AzureDataLakeStorageGen2DataFeed, "Elasticsearch": ElasticsearchDataFeed }
[docs]class Anomaly(msrest.serialization.Model): """Anomaly. Variables are only populated by the server, and will be ignored when sending a request. :ivar metric_id: metric unique id. Only returned for alerting anomaly result. :vartype metric_id: str :ivar detection_configuration_id: anomaly detection configuration unique id. Only returned for alerting anomaly result. :vartype detection_configuration_id: str :ivar timestamp: anomaly time. :vartype timestamp: ~datetime.datetime :ivar created_time: created time. Only returned for alerting result. :vartype created_time: ~datetime.datetime :ivar modified_time: modified time. Only returned for alerting result. :vartype modified_time: ~datetime.datetime :ivar dimension: dimension specified for series. :vartype dimension: dict[str, str] :ivar severity: anomaly severity. Possible values include: "Low", "Medium", "High". :vartype anomaly_severity: str or ~azure.ai.metricsadvisor.models.Severity :vartype severity: str :ivar status: anomaly status. only returned for alerting anomaly result. Possible values include: "Active", "Resolved". :vartype status: str """ _validation = { 'metric_id': {'readonly': True}, 'detection_configuration_id': {'readonly': True}, 'timestamp': {'readonly': True}, 'created_on': {'readonly': True}, 'modified_time': {'readonly': True}, 'dimension': {'readonly': True}, } _attribute_map = { 'metric_id': {'key': 'metricId', 'type': 'str'}, 'detection_configuration_id': {'key': 'detectionConfigurationId', 'type': 'str'}, 'timestamp': {'key': 'timestamp', 'type': 'iso-8601'}, 'created_on': {'key': 'createdOn', 'type': 'iso-8601'}, 'modified_time': {'key': 'modifiedTime', 'type': 'iso-8601'}, 'dimension': {'key': 'dimension', 'type': '{str}'}, 'severity': {'key': 'severity', 'type': 'str'}, 'status': {'key': 'status', 'type': 'str'}, } def __init__( self, **kwargs ): super(Anomaly, self).__init__(**kwargs) self.metric_id = kwargs.get('metric_id', None) self.detection_configuration_id = kwargs.get('detection_configuration_id', None) self.timestamp = kwargs.get('timestamp', None) self.created_on = kwargs.get('created_time', None) self.modified_time = kwargs.get('modified_time', None) self.dimension = kwargs.get('dimension', None) self.severity = kwargs.get('severity', None) self.status = kwargs.get('status', None) @classmethod def _from_generated(cls, anomaly_result): # type: (AnomalyResult) -> Union[Anomaly, None] if not anomaly_result: return None severity = None status = None if anomaly_result.property: severity = anomaly_result.property.anomaly_severity status = anomaly_result.property.anomaly_status return cls( metric_id=anomaly_result.metric_id, detection_configuration_id=anomaly_result.anomaly_detection_configuration_id, timestamp=anomaly_result.timestamp, created_on=anomaly_result.created_time, modified_time=anomaly_result.modified_time, dimension=anomaly_result.dimension, severity=severity, status=status, )
[docs]class Incident(msrest.serialization.Model): """Incident. Variables are only populated by the server, and will be ignored when sending a request. :ivar metric_id: metric unique id. Only returned for alerting incident result. :vartype metric_id: str :ivar detection_configuration_id: anomaly detection configuration unique id. Only returned for alerting incident result. :vartype detection_configuration_id: str :ivar id: incident id. :vartype id: str :ivar start_time: incident start time. :vartype start_time: ~datetime.datetime :ivar last_time: incident last time. :vartype last_time: ~datetime.datetime :param dimension_key: dimension specified for series. :type dimension_key: dict[str, str] :ivar severity: max severity of latest anomalies in the incident. Possible values include: "Low", "Medium", "High". :vartype severity: str or ~azure.ai.metricsadvisor.models.Severity :ivar status: incident status only return for alerting incident result. Possible values include: "Active", "Resolved". :vartype status: str or ~azure.ai.metricsadvisor.models.IncidentPropertyIncidentStatus """ _validation = { 'metric_id': {'readonly': True}, 'detection_configuration_id': {'readonly': True}, 'id': {'readonly': True}, 'start_time': {'readonly': True}, 'last_time': {'readonly': True}, } _attribute_map = { 'metric_id': {'key': 'metricId', 'type': 'str'}, 'detection_configuration_id': {'key': 'detectionConfigurationId', 'type': 'str'}, 'id': {'key': 'id', 'type': 'str'}, 'start_time': {'key': 'startTime', 'type': 'iso-8601'}, 'last_time': {'key': 'lastTime', 'type': 'iso-8601'}, 'dimension_key': {'key': 'dimensionKey', 'type': '{str}'}, 'severity': {'key': 'severity', 'type': 'str'}, 'status': {'key': 'status', 'type': 'str'}, } def __init__( self, **kwargs ): super(Incident, self).__init__(**kwargs) self.metric_id = kwargs.get('metric_id', None) self.detection_configuration_id = kwargs.get('detection_configuration_id', None) self.incident_id = kwargs.get('incident_id', None) self.start_time = kwargs.get('start_time', None) self.last_time = kwargs.get('last_time', None) self.dimension_key = kwargs.get('dimension_key', None) self.severity = kwargs.get('severity', None) self.status = kwargs.get('status', None) @classmethod def _from_generated(cls, incident_result): # type: (IncidentResult) -> Union[Incident, None] if not incident_result: return None dimension_key = incident_result.root_node.dimension if incident_result.root_node else None severity = None status = None if incident_result.property: severity = incident_result.property.max_severity status = incident_result.property.incident_status return cls( metric_id=incident_result.metric_id, detection_configuration_id=incident_result.anomaly_detection_configuration_id, incident_id=incident_result.incident_id, start_time=incident_result.start_time, last_time=incident_result.last_time, dimension_key=dimension_key, severity=severity, status=status, )
[docs]class IncidentRootCause(msrest.serialization.Model): """Incident Root Cause. Variables are only populated by the server, and will be ignored when sending a request. :param dimension_key: dimension specified for series group. :type dimension_key: dict[str, str] :ivar path: drilling down path from query anomaly to root cause. :vartype path: list[str] :ivar score: score. :vartype score: float :ivar description: description. :vartype description: str """ _validation = { 'path': {'readonly': True}, 'score': {'readonly': True}, 'description': {'readonly': True}, } _attribute_map = { 'dimension_key': {'key': 'dimensionKey', 'type': '{str}'}, 'path': {'key': 'path', 'type': '[str]'}, 'score': {'key': 'score', 'type': 'float'}, 'description': {'key': 'description', 'type': 'str'}, } def __init__( self, **kwargs ): super(IncidentRootCause, self).__init__(**kwargs) self.dimension_key = kwargs.get('dimension_key', None) self.path = kwargs.get('path', None) self.score = kwargs.get('score', None) self.description = kwargs.get('description', None) @classmethod def _from_generated(cls, root_cause): # type: (RootCause) -> Union[IncidentRootCause, None] if not root_cause: return None dimension_key = root_cause.root_cause.dimension if root_cause.root_cause else None return cls( dimension_key=dimension_key, path=root_cause.path, score=root_cause.score, description=root_cause.description, )
[docs]class AnomalyFeedback(msrest.serialization.Model): # pylint:disable=too-many-instance-attributes """AnomalyFeedback. Variables are only populated by the server, and will be ignored when sending a request. All required parameters must be populated in order to send to Azure. :param feedback_type: Required. feedback type.Constant filled by server. Possible values include: "Anomaly", "ChangePoint", "Period", "Comment". :type feedback_type: str or ~azure.ai.metricsadvisor.models.FeedbackType :ivar id: feedback unique id. :vartype id: str :ivar created_time: feedback created time. :vartype created_time: ~datetime.datetime :ivar user_principal: user who gives this feedback. :vartype user_principal: str :param metric_id: Required. metric unique id. :type metric_id: str :param dimension_key: Required. metric dimension filter. :type dimension_key: dict[str, str] :param start_time: Required. the start timestamp of feedback timerange. :type start_time: ~datetime.datetime :param end_time: Required. the end timestamp of feedback timerange, when equals to startTime means only one timestamp. :type end_time: ~datetime.datetime :param value: Required. Possible values include: "AutoDetect", "Anomaly", "NotAnomaly". :type value: str or ~azure.ai.metricsadvisor.models.AnomalyValue :keyword anomaly_detection_configuration_id: the corresponding anomaly detection configuration of this feedback. :paramtype anomaly_detection_configuration_id: str :keyword anomaly_detection_configuration_snapshot: :paramtype anomaly_detection_configuration_snapshot: ~azure.ai.metricsadvisor.models.AnomalyDetectionConfiguration """ _validation = { 'feedback_type': {'required': True}, 'id': {'readonly': True}, 'created_time': {'readonly': True}, 'user_principal': {'readonly': True}, 'metric_id': {'required': True}, 'dimension_key': {'required': True}, 'start_time': {'required': True}, 'end_time': {'required': True}, 'value': {'required': True}, } _attribute_map = { 'feedback_type': {'key': 'feedbackType', 'type': 'str'}, 'id': {'key': 'id', 'type': 'str'}, 'created_time': {'key': 'createdTime', 'type': 'iso-8601'}, 'user_principal': {'key': 'userPrincipal', 'type': 'str'}, 'metric_id': {'key': 'metricId', 'type': 'str'}, 'dimension_key': {'key': 'dimensionKey', 'type': '{str}'}, 'start_time': {'key': 'startTime', 'type': 'iso-8601'}, 'end_time': {'key': 'endTime', 'type': 'iso-8601'}, 'value': {'key': 'value', 'type': 'str'}, 'anomaly_detection_configuration_id': {'key': 'anomalyDetectionConfigurationId', 'type': 'str'}, 'anomaly_detection_configuration_snapshot': {'key': 'anomalyDetectionConfigurationSnapshot', 'type': 'AnomalyDetectionConfiguration'}, } def __init__( self, metric_id, dimension_key, start_time, end_time, value, **kwargs ): super(AnomalyFeedback, self).__init__(**kwargs) self.feedback_type = 'Anomaly' # type: str self.id = kwargs.get('id', None) self.created_time = kwargs.get('created_time', None) self.user_principal = kwargs.get('user_principal', None) self.metric_id = metric_id self.dimension_key = dimension_key self.start_time = start_time self.end_time = end_time self.value = value self.anomaly_detection_configuration_id = kwargs.get('anomaly_detection_configuration_id', None) self.anomaly_detection_configuration_snapshot = kwargs.get('anomaly_detection_configuration_snapshot', None) @classmethod def _from_generated(cls, anomaly_feedback): # type: (_AnomalyFeedback) -> Union[AnomalyFeedback, None] if not anomaly_feedback: return None dimension_key = anomaly_feedback.dimension_filter.dimension value = anomaly_feedback.value.anomaly_value return cls( id=anomaly_feedback.feedback_id, created_time=anomaly_feedback.created_time, user_principal=anomaly_feedback.user_principal, metric_id=anomaly_feedback.metric_id, dimension_key=dimension_key, start_time=anomaly_feedback.start_time, end_time=anomaly_feedback.end_time, value=value, anomaly_detection_configuration_id=anomaly_feedback.anomaly_detection_configuration_id, anomaly_detection_configuration_snapshot=anomaly_feedback.anomaly_detection_configuration_snapshot, ) def _to_generated(self): # type: (AnomalyFeedback) -> _AnomalyFeedback dimension_filter = FeedbackDimensionFilter(dimension=self.dimension_key) value = AnomalyFeedbackValue(anomaly_value=self.value) return _AnomalyFeedback( feedback_id=self.id, created_time=self.created_time, user_principal=self.user_principal, metric_id=self.metric_id, dimension_filter=dimension_filter, start_time=self.start_time, end_time=self.end_time, value=value, anomaly_detection_configuration_id=self.anomaly_detection_configuration_id, anomaly_detection_configuration_snapshot=self.anomaly_detection_configuration_snapshot )
[docs]class ChangePointFeedback(msrest.serialization.Model): """ChangePointFeedback. Variables are only populated by the server, and will be ignored when sending a request. All required parameters must be populated in order to send to Azure. :param feedback_type: Required. feedback type.Constant filled by server. Possible values include: "Anomaly", "ChangePoint", "Period", "Comment". :type feedback_type: str or ~azure.ai.metricsadvisor.models.FeedbackType :ivar id: feedback unique id. :vartype id: str :ivar created_time: feedback created time. :vartype created_time: ~datetime.datetime :ivar user_principal: user who gives this feedback. :vartype user_principal: str :param metric_id: Required. metric unique id. :type metric_id: str :param dimension_key: Required. metric dimension filter. :type dimension_key: dict[str, str] :param start_time: Required. the start timestamp of feedback timerange. :type start_time: ~datetime.datetime :param end_time: Required. the end timestamp of feedback timerange, when equals to startTime means only one timestamp. :type end_time: ~datetime.datetime :param value: Required. Possible values include: "AutoDetect", "ChangePoint", "NotChangePoint". :type value: str or ~azure.ai.metricsadvisor.models.ChangePointValue """ _validation = { 'feedback_type': {'required': True}, 'id': {'readonly': True}, 'created_time': {'readonly': True}, 'user_principal': {'readonly': True}, 'metric_id': {'required': True}, 'dimension_key': {'required': True}, 'start_time': {'required': True}, 'end_time': {'required': True}, 'value': {'required': True}, } _attribute_map = { 'feedback_type': {'key': 'feedbackType', 'type': 'str'}, 'id': {'key': 'id', 'type': 'str'}, 'created_time': {'key': 'createdTime', 'type': 'iso-8601'}, 'user_principal': {'key': 'userPrincipal', 'type': 'str'}, 'metric_id': {'key': 'metricId', 'type': 'str'}, 'dimension_key': {'key': 'dimensionKey', 'type': '{str}'}, 'start_time': {'key': 'startTime', 'type': 'iso-8601'}, 'end_time': {'key': 'endTime', 'type': 'iso-8601'}, 'value': {'key': 'value', 'type': 'str'}, } def __init__( self, metric_id, dimension_key, start_time, end_time, value, **kwargs ): super(ChangePointFeedback, self).__init__(**kwargs) self.feedback_type = 'ChangePoint' # type: str self.id = kwargs.get('id', None) self.created_time = kwargs.get('created_time', None) self.user_principal = kwargs.get('user_principal', None) self.metric_id = metric_id self.dimension_key = dimension_key self.start_time = start_time self.end_time = end_time self.value = value @classmethod def _from_generated(cls, change_point_feedback): # type: (_ChangePointFeedback) -> Union[ChangePointFeedback, None] if not change_point_feedback: return None dimension_key = change_point_feedback.dimension_filter.dimension value = change_point_feedback.value.change_point_value return cls( id=change_point_feedback.feedback_id, created_time=change_point_feedback.created_time, user_principal=change_point_feedback.user_principal, metric_id=change_point_feedback.metric_id, dimension_key=dimension_key, start_time=change_point_feedback.start_time, end_time=change_point_feedback.end_time, value=value, ) def _to_generated(self): # type: (ChangePointFeedback) -> _ChangePointFeedback dimension_filter = FeedbackDimensionFilter(dimension=self.dimension_key) value = ChangePointFeedbackValue(change_point_value=self.value) return _ChangePointFeedback( feedback_id=self.id, created_time=self.created_time, user_principal=self.user_principal, metric_id=self.metric_id, dimension_filter=dimension_filter, start_time=self.start_time, end_time=self.end_time, value=value, )
[docs]class CommentFeedback(msrest.serialization.Model): """CommentFeedback. Variables are only populated by the server, and will be ignored when sending a request. All required parameters must be populated in order to send to Azure. :param feedback_type: Required. feedback type.Constant filled by server. Possible values include: "Anomaly", "ChangePoint", "Period", "Comment". :type feedback_type: str or ~azure.ai.metricsadvisor.models.FeedbackType :ivar id: feedback unique id. :vartype id: str :ivar created_time: feedback created time. :vartype created_time: ~datetime.datetime :ivar user_principal: user who gives this feedback. :vartype user_principal: str :param metric_id: Required. metric unique id. :type metric_id: str :param dimension_key: Required. metric dimension filter. :type dimension_key: dict[str, str] :param start_time: the start timestamp of feedback timerange. :type start_time: ~datetime.datetime :param end_time: the end timestamp of feedback timerange, when equals to startTime means only one timestamp. :type end_time: ~datetime.datetime :param value: Required. the comment string. :type value: str """ _validation = { 'feedback_type': {'required': True}, 'id': {'readonly': True}, 'created_time': {'readonly': True}, 'user_principal': {'readonly': True}, 'metric_id': {'required': True}, 'dimension_key': {'required': True}, 'value': {'required': True}, } _attribute_map = { 'feedback_type': {'key': 'feedbackType', 'type': 'str'}, 'id': {'key': 'id', 'type': 'str'}, 'created_time': {'key': 'createdTime', 'type': 'iso-8601'}, 'user_principal': {'key': 'userPrincipal', 'type': 'str'}, 'metric_id': {'key': 'metricId', 'type': 'str'}, 'dimension_key': {'key': 'dimensionKey', 'type': '{str}'}, 'start_time': {'key': 'startTime', 'type': 'iso-8601'}, 'end_time': {'key': 'endTime', 'type': 'iso-8601'}, 'value': {'key': 'value', 'type': 'str'}, } def __init__( self, metric_id, dimension_key, start_time, end_time, value, **kwargs ): super(CommentFeedback, self).__init__(**kwargs) self.feedback_type = 'Comment' # type: str self.id = kwargs.get('id', None) self.created_time = kwargs.get('created_time', None) self.user_principal = kwargs.get('user_principal', None) self.metric_id = metric_id self.dimension_key = dimension_key self.start_time = start_time self.end_time = end_time self.value = value @classmethod def _from_generated(cls, comment_feedback): # type: (_CommentFeedback) -> Union[CommentFeedback, None] if not comment_feedback: return None dimension_key = comment_feedback.dimension_filter.dimension value = comment_feedback.value.comment_value return cls( id=comment_feedback.feedback_id, created_time=comment_feedback.created_time, user_principal=comment_feedback.user_principal, metric_id=comment_feedback.metric_id, dimension_key=dimension_key, start_time=comment_feedback.start_time, end_time=comment_feedback.end_time, value=value, ) def _to_generated(self): # type: (CommentFeedback) -> _CommentFeedback dimension_filter = FeedbackDimensionFilter(dimension=self.dimension_key) value = CommentFeedbackValue(comment_value=self.value) return _CommentFeedback( feedback_id=self.id, created_time=self.created_time, user_principal=self.user_principal, metric_id=self.metric_id, dimension_filter=dimension_filter, start_time=self.start_time, end_time=self.end_time, value=value, )
[docs]class PeriodFeedback(msrest.serialization.Model): """PeriodFeedback. Variables are only populated by the server, and will be ignored when sending a request. All required parameters must be populated in order to send to Azure. :param feedback_type: Required. feedback type.Constant filled by server. Possible values include: "Anomaly", "ChangePoint", "Period", "Comment". :type feedback_type: str or ~azure.ai.metricsadvisor.models.FeedbackType :ivar id: feedback unique id. :vartype id: str :ivar created_time: feedback created time. :vartype created_time: ~datetime.datetime :ivar user_principal: user who gives this feedback. :vartype user_principal: str :param metric_id: Required. metric unique id. :type metric_id: str :param dimension_key: Required. metric dimension filter. :type dimension_key: dict[str, str] :param value: Required. :type value: int :param period_type: Required. the type of setting period. Possible values include: "AutoDetect", "AssignValue". :type period_type: str or ~azure.ai.metricsadvisor.models.PeriodType """ _validation = { 'feedback_type': {'required': True}, 'id': {'readonly': True}, 'created_time': {'readonly': True}, 'user_principal': {'readonly': True}, 'metric_id': {'required': True}, 'dimension_key': {'required': True}, 'value': {'required': True}, 'period_type': {'required': True}, } _attribute_map = { 'feedback_type': {'key': 'feedbackType', 'type': 'str'}, 'id': {'key': 'id', 'type': 'str'}, 'created_time': {'key': 'createdTime', 'type': 'iso-8601'}, 'user_principal': {'key': 'userPrincipal', 'type': 'str'}, 'metric_id': {'key': 'metricId', 'type': 'str'}, 'dimension_key': {'key': 'dimensionKey', 'type': '{str}'}, 'value': {'key': 'value', 'type': 'int'}, 'period_type': {'key': 'periodType', 'type': 'str'}, } def __init__( self, metric_id, dimension_key, value, period_type, **kwargs ): super(PeriodFeedback, self).__init__(**kwargs) self.feedback_type = 'Period' # type: str self.id = kwargs.get('id', None) self.created_time = kwargs.get('created_time', None) self.user_principal = kwargs.get('user_principal', None) self.metric_id = metric_id self.dimension_key = dimension_key self.value = value self.period_type = period_type @classmethod def _from_generated(cls, period_feedback): # type: (_PeriodFeedback) -> Union[PeriodFeedback, None] if not period_feedback: return None dimension_key = period_feedback.dimension_filter.dimension value = period_feedback.value.period_value period_type = period_feedback.value.period_type return cls( id=period_feedback.feedback_id, created_time=period_feedback.created_time, user_principal=period_feedback.user_principal, metric_id=period_feedback.metric_id, dimension_key=dimension_key, value=value, period_type=period_type, ) def _to_generated(self): # type: (PeriodFeedback) -> _PeriodFeedback dimension_filter = FeedbackDimensionFilter(dimension=self.dimension_key) value = PeriodFeedbackValue(period_type=self.period_type, period_value=self.value) return _PeriodFeedback( feedback_id=self.id, created_time=self.created_time, user_principal=self.user_principal, metric_id=self.metric_id, dimension_filter=dimension_filter, value=value, )