# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import logging
from typing import Optional, Any
from opentelemetry.sdk.metrics.export import (
DataPointT,
HistogramDataPoint,
MetricExporter,
MetricExportResult,
MetricsData as OTMetricsData,
NumberDataPoint,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from azure.monitor.opentelemetry.exporter import _utils
from azure.monitor.opentelemetry.exporter._generated.models import (
MetricDataPoint,
MetricsData,
MonitorBase,
TelemetryItem,
)
from azure.monitor.opentelemetry.exporter.export._base import (
BaseExporter,
ExportResult,
)
_logger = logging.getLogger(__name__)
__all__ = ["AzureMonitorMetricExporter"]
[docs]class AzureMonitorMetricExporter(BaseExporter, MetricExporter):
"""Azure Monitor Metric exporter for OpenTelemetry."""
def __init__(self, **kwargs: Any) -> None:
BaseExporter.__init__(self, **kwargs)
MetricExporter.__init__(
self,
preferred_temporality=kwargs.get("preferred_temporality"),
preferred_aggregation=kwargs.get("preferred_aggregation"),
)
[docs] def export(
self,
metrics_data: OTMetricsData,
timeout_millis: float = 10_000, # pylint: disable=unused-argument
**kwargs: Any, # pylint: disable=unused-argument
) -> MetricExportResult:
"""Exports a batch of metric data
:param metrics: Open Telemetry Metric(s) to export.
:type metrics_data: Sequence[~opentelemetry.sdk.metrics._internal.point.MetricsData]
:rtype: ~opentelemetry.sdk.metrics.export.MetricExportResult
"""
envelopes = []
if metrics_data is None:
return MetricExportResult.SUCCESS
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
for point in metric.data.data_points:
if point is not None:
envelopes.append(
self._point_to_envelope(
point,
metric.name,
resource_metric.resource,
scope_metric.scope
)
)
try:
result = self._transmit(envelopes)
self._handle_transmit_from_storage(envelopes, result)
return _get_metric_export_result(result)
except Exception: # pylint: disable=broad-except
_logger.exception("Exception occurred while exporting the data.")
return _get_metric_export_result(ExportResult.FAILED_NOT_RETRYABLE)
[docs] def force_flush(
self,
timeout_millis: float = 10_000,
) -> bool:
"""
Ensure that export of any metrics currently received by the exporter
are completed as soon as possible.
"""
return True
[docs] def shutdown(
self,
timeout_millis: float = 30_000, # pylint: disable=unused-argument
**kwargs: Any, # pylint: disable=unused-argument
) -> None:
"""Shuts down the exporter.
Called when the SDK is shut down.
"""
self.storage.close()
def _point_to_envelope(
self,
point: DataPointT,
name: str,
resource: Optional[Resource] = None,
scope: Optional[InstrumentationScope] = None
) -> TelemetryItem:
envelope = _convert_point_to_envelope(point, name, resource, scope)
envelope.instrumentation_key = self._instrumentation_key
return envelope
[docs] @classmethod
def from_connection_string(
cls, conn_str: str, **kwargs: Any
) -> "AzureMonitorMetricExporter":
"""
Create an AzureMonitorMetricExporter from a connection string.
This is the recommended way of instantation if a connection string is passed in explicitly.
If a user wants to use a connection string provided by environment variable, the constructor
of the exporter can be called directly.
:param str conn_str: The connection string to be used for authentication.
:keyword str api_version: The service API version used. Defaults to latest.
:returns an instance of ~AzureMonitorMetricExporter
"""
return cls(connection_string=conn_str, **kwargs)
# pylint: disable=protected-access
def _convert_point_to_envelope(
point: DataPointT,
name: str,
resource: Optional[Resource] = None,
scope: Optional[InstrumentationScope] = None
) -> TelemetryItem:
envelope = _utils._create_telemetry_item(point.time_unix_nano)
envelope.name = "Microsoft.ApplicationInsights.Metric"
envelope.tags.update(_utils._populate_part_a_fields(resource))
namespace = None
if scope is not None:
namespace = scope.name
value = 0
count = 1
min_ = None
max_ = None
# std_dev = None
if isinstance(point, NumberDataPoint):
value = point.value
elif isinstance(point, HistogramDataPoint):
value = point.sum
count = int(point.count)
min_ = point.min
max_ = point.max
data_point = MetricDataPoint(
name=str(name)[:1024],
namespace=str(namespace)[:256],
value=value,
count=count,
min=min_,
max=max_,
)
properties = _utils._filter_custom_properties(point.attributes)
data = MetricsData(
properties=properties,
metrics=[data_point],
)
envelope.data = MonitorBase(base_data=data, base_type="MetricData")
return envelope
def _get_metric_export_result(result: ExportResult) -> MetricExportResult:
if result == ExportResult.SUCCESS:
return MetricExportResult.SUCCESS
if result in (
ExportResult.FAILED_RETRYABLE,
ExportResult.FAILED_NOT_RETRYABLE,
):
return MetricExportResult.FAILURE
return None