Source code for azure.monitor.opentelemetry.exporter.export.metrics._exporter

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import logging

from typing import Mapping, Optional, Any

from opentelemetry.util.types import AttributeValue
from opentelemetry.sdk.metrics import (
    Counter,
    Histogram,
    ObservableCounter,
    ObservableGauge,
    ObservableUpDownCounter,
    UpDownCounter,
)
from opentelemetry.sdk.metrics.export import (
    AggregationTemporality,
    DataPointT,
    HistogramDataPoint,
    MetricExporter,
    MetricExportResult,
    MetricsData as OTMetricsData,
    NumberDataPoint,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope

from azure.monitor.opentelemetry.exporter._constants import _AUTOCOLLECTED_INSTRUMENT_NAMES
from azure.monitor.opentelemetry.exporter import _utils
from azure.monitor.opentelemetry.exporter._generated.models import (
    MetricDataPoint,
    MetricsData,
    MonitorBase,
    TelemetryItem,
)
from azure.monitor.opentelemetry.exporter.export._base import (
    BaseExporter,
    ExportResult,
)

_logger = logging.getLogger(__name__)

__all__ = ["AzureMonitorMetricExporter"]


APPLICATION_INSIGHTS_METRIC_TEMPORALITIES = {
    Counter: AggregationTemporality.DELTA,
    Histogram: AggregationTemporality.DELTA,
    ObservableCounter: AggregationTemporality.DELTA,
    ObservableGauge: AggregationTemporality.CUMULATIVE,
    ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
    UpDownCounter: AggregationTemporality.CUMULATIVE,
}


[docs]class AzureMonitorMetricExporter(BaseExporter, MetricExporter): """Azure Monitor Metric exporter for OpenTelemetry.""" def __init__(self, **kwargs: Any) -> None: BaseExporter.__init__(self, **kwargs) MetricExporter.__init__( self, preferred_temporality=APPLICATION_INSIGHTS_METRIC_TEMPORALITIES, preferred_aggregation=kwargs.get("preferred_aggregation"), ) # pylint: disable=R1702
[docs] def export( self, metrics_data: OTMetricsData, timeout_millis: float = 10_000, # pylint: disable=unused-argument **kwargs: Any, # pylint: disable=unused-argument ) -> MetricExportResult: """Exports a batch of metric data :param metrics: Open Telemetry Metric(s) to export. :type metrics_data: Sequence[~opentelemetry.sdk.metrics._internal.point.MetricsData] :rtype: ~opentelemetry.sdk.metrics.export.MetricExportResult """ envelopes = [] if metrics_data is None: return MetricExportResult.SUCCESS for resource_metric in metrics_data.resource_metrics: for scope_metric in resource_metric.scope_metrics: for metric in scope_metric.metrics: for point in metric.data.data_points: if point is not None: envelope = self._point_to_envelope( point, metric.name, resource_metric.resource, scope_metric.scope ) if envelope is not None: envelopes.append(envelope) try: result = self._transmit(envelopes) self._handle_transmit_from_storage(envelopes, result) return _get_metric_export_result(result) except Exception: # pylint: disable=broad-except _logger.exception("Exception occurred while exporting the data.") return _get_metric_export_result(ExportResult.FAILED_NOT_RETRYABLE)
[docs] def force_flush( self, timeout_millis: float = 10_000, ) -> bool: """ Ensure that export of any metrics currently received by the exporter are completed as soon as possible. """ return True
[docs] def shutdown( self, timeout_millis: float = 30_000, # pylint: disable=unused-argument **kwargs: Any, # pylint: disable=unused-argument ) -> None: """Shuts down the exporter. Called when the SDK is shut down. """ self.storage.close()
def _point_to_envelope( self, point: DataPointT, name: str, resource: Optional[Resource] = None, scope: Optional[InstrumentationScope] = None ) -> Optional[TelemetryItem]: envelope = _convert_point_to_envelope(point, name, resource, scope) if name in _AUTOCOLLECTED_INSTRUMENT_NAMES: envelope = _handle_std_metric_envelope(envelope, name, point.attributes) if envelope is not None: envelope.instrumentation_key = self._instrumentation_key return envelope
[docs] @classmethod def from_connection_string( cls, conn_str: str, **kwargs: Any ) -> "AzureMonitorMetricExporter": """ Create an AzureMonitorMetricExporter from a connection string. This is the recommended way of instantation if a connection string is passed in explicitly. If a user wants to use a connection string provided by environment variable, the constructor of the exporter can be called directly. :param str conn_str: The connection string to be used for authentication. :keyword str api_version: The service API version used. Defaults to latest. :returns an instance of ~AzureMonitorMetricExporter """ return cls(connection_string=conn_str, **kwargs)
# pylint: disable=protected-access def _convert_point_to_envelope( point: DataPointT, name: str, resource: Optional[Resource] = None, scope: Optional[InstrumentationScope] = None ) -> TelemetryItem: envelope = _utils._create_telemetry_item(point.time_unix_nano) envelope.name = "Microsoft.ApplicationInsights.Metric" envelope.tags.update(_utils._populate_part_a_fields(resource)) namespace = None if scope is not None: namespace = scope.name value = 0 count = 1 min_ = None max_ = None # std_dev = None if isinstance(point, NumberDataPoint): value = point.value elif isinstance(point, HistogramDataPoint): value = point.sum count = int(point.count) min_ = point.min max_ = point.max # truncation logic properties = _utils._filter_custom_properties(point.attributes) if namespace is not None: namespace = str(namespace)[:256] data_point = MetricDataPoint( name=str(name)[:1024], namespace=namespace, value=value, count=count, min=min_, max=max_, ) data = MetricsData( properties=properties, metrics=[data_point], ) envelope.data = MonitorBase(base_data=data, base_type="MetricData") return envelope # pylint: disable=protected-access def _handle_std_metric_envelope( envelope: TelemetryItem, name: str, attributes:Mapping[str, AttributeValue] ) -> Optional[TelemetryItem]: properties = {} tags = envelope.tags # TODO: switch to semconv constants status_code = attributes.get("http.status_code", None) if name == "http.client.duration": properties["_MS.MetricId"] = "dependencies/duration" properties["_MS.IsAutocollected"] = "True" properties["Dependency.Type"] = "http" properties["Dependency.Success"] = str(_is_status_code_success(status_code, 400)) target = None if "peer.service" in attributes: target = attributes["peer.service"] elif "net.peer.name" in attributes: if attributes["net.peer.name"] is None: target = None elif "net.host.port" in attributes and \ attributes["net.host.port"] is not None: target = "{}:{}".format( attributes["net.peer.name"], attributes["net.host.port"], ) else: target = attributes["net.peer.name"] properties["dependency/target"] = target properties["dependency/resultCode"] = str(status_code) # TODO: operation/synthetic properties["cloud/roleInstance"] = tags["ai.cloud.roleInstance"] properties["cloud/roleName"] = tags["ai.cloud.role"] elif name == "http.server.duration": properties["_MS.MetricId"] = "requests/duration" properties["_MS.IsAutocollected"] = "True" properties["request/resultCode"] = str(status_code) # TODO: operation/synthetic properties["cloud/roleInstance"] = tags["ai.cloud.roleInstance"] properties["cloud/roleName"] = tags["ai.cloud.role"] properties["Request.Success"] = str(_is_status_code_success(status_code, 500)) else: # Any other autocollected metrics are not supported yet for standard metrics # We ignore these envelopes in these cases return None # TODO: rpc, database, messaging envelope.data.base_data.properties = properties return envelope def _is_status_code_success(status_code: Optional[str], threshold: int) -> bool: return status_code is not None and int(status_code) < threshold def _get_metric_export_result(result: ExportResult) -> MetricExportResult: if result == ExportResult.SUCCESS: return MetricExportResult.SUCCESS if result in ( ExportResult.FAILED_RETRYABLE, ExportResult.FAILED_NOT_RETRYABLE, ): return MetricExportResult.FAILURE return None