# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import logging
from typing import Sequence, Any
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs.severity import SeverityNumber
from opentelemetry.sdk._logs.export import LogExporter, LogExportResult
from azure.monitor.opentelemetry.exporter import _utils
from azure.monitor.opentelemetry.exporter._generated.models import (
MessageData,
MonitorBase,
TelemetryExceptionData,
TelemetryExceptionDetails,
TelemetryItem,
)
from azure.monitor.opentelemetry.exporter.export._base import (
BaseExporter,
ExportResult,
)
_logger = logging.getLogger(__name__)
_DEFAULT_SPAN_ID = 0
_DEFAULT_TRACE_ID = 0
__all__ = ["AzureMonitorLogExporter"]
[docs]class AzureMonitorLogExporter(BaseExporter, LogExporter):
"""Azure Monitor Log exporter for OpenTelemetry."""
[docs] def export(
self, batch: Sequence[LogData], **kwargs: Any # pylint: disable=unused-argument
) -> LogExportResult:
"""Export log data
:param batch: Open Telemetry LogData(s) to export.
:type batch: Sequence[~opentelemetry._logs.LogData]
:rtype: ~opentelemetry.sdk._logs.export.LogData
"""
envelopes = [self._log_to_envelope(log) for log in batch]
try:
result = self._transmit(envelopes)
self._handle_transmit_from_storage(envelopes, result)
return _get_log_export_result(result)
except Exception: # pylint: disable=broad-except
_logger.exception("Exception occurred while exporting the data.")
return _get_log_export_result(ExportResult.FAILED_NOT_RETRYABLE)
[docs] def shutdown(self) -> None:
"""Shuts down the exporter.
Called when the SDK is shut down.
"""
self.storage.close()
def _log_to_envelope(self, log_data: LogData) -> TelemetryItem:
if not log_data:
return None
envelope = _convert_log_to_envelope(log_data)
envelope.instrumentation_key = self._instrumentation_key
return envelope
[docs] @classmethod
def from_connection_string(
cls, conn_str: str, **kwargs: Any
) -> "AzureMonitorLogExporter":
"""
Create an AzureMonitorLogExporter from a connection string.
This is the recommended way of instantation if a connection string is passed in explicitly.
If a user wants to use a connection string provided by environment variable, the constructor
of the exporter can be called directly.
:param str conn_str: The connection string to be used for authentication.
:keyword str api_version: The service API version used. Defaults to latest.
:returns an instance of ~AzureMonitorLogExporter
"""
return cls(connection_string=conn_str, **kwargs)
# pylint: disable=protected-access
def _convert_log_to_envelope(log_data: LogData) -> TelemetryItem:
log_record = log_data.log_record
envelope = _utils._create_telemetry_item(log_record.timestamp)
envelope.tags.update(_utils._populate_part_a_fields(log_record.resource))
envelope.tags["ai.operation.id"] = "{:032x}".format(
log_record.trace_id or _DEFAULT_TRACE_ID
)
envelope.tags["ai.operation.parentId"] = "{:016x}".format(
log_record.span_id or _DEFAULT_SPAN_ID
)
properties = _utils._filter_custom_properties(
log_record.attributes,
lambda key, val: not _is_opentelemetry_standard_attribute(key)
)
exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
# pylint: disable=line-too-long
stack_trace = log_record.attributes.get(SpanAttributes.EXCEPTION_STACKTRACE)
severity_level = _get_severity_level(log_record.severity_number)
# Exception telemetry
if exc_type is not None or exc_message is not None:
envelope.name = "Microsoft.ApplicationInsights.Exception"
has_full_stack = stack_trace is not None
if not exc_message:
exc_message = "Exception"
exc_details = TelemetryExceptionDetails(
type_name=str(exc_type)[:1024],
message=str(exc_message)[:32768],
has_full_stack=has_full_stack,
stack=str(stack_trace)[:32768],
)
data = TelemetryExceptionData(
severity_level=severity_level,
properties=properties,
exceptions=[exc_details],
)
# pylint: disable=line-too-long
envelope.data = MonitorBase(base_data=data, base_type="ExceptionData")
else: # Message telemetry
envelope.name = "Microsoft.ApplicationInsights.Message"
# pylint: disable=line-too-long
# Severity number: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber
data = MessageData(
message=str(log_record.body)[:32768],
severity_level=severity_level,
properties=properties,
)
envelope.data = MonitorBase(base_data=data, base_type="MessageData")
return envelope
def _get_log_export_result(result: ExportResult) -> LogExportResult:
if result == ExportResult.SUCCESS:
return LogExportResult.SUCCESS
if result in (
ExportResult.FAILED_RETRYABLE,
ExportResult.FAILED_NOT_RETRYABLE,
):
return LogExportResult.FAILURE
return None
# pylint: disable=line-too-long
# Common schema: https://github.com/microsoft/common-schema/blob/main/Mappings/AzureMonitor-AI.md#messageseveritylevel
# SeverityNumber specs: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber
def _get_severity_level(severity_number: SeverityNumber):
if severity_number.value < 9:
return 0
return int((severity_number.value - 1) / 4 - 1)
def _is_opentelemetry_standard_attribute(key: str) -> bool:
return key in _EXCEPTION_ATTRS
_EXCEPTION_ATTRS = frozenset(
(
SpanAttributes.EXCEPTION_TYPE,
SpanAttributes.EXCEPTION_MESSAGE,
SpanAttributes.EXCEPTION_STACKTRACE,
SpanAttributes.EXCEPTION_ESCAPED,
)
)