Source code for azure.monitor.opentelemetry.exporter.export.trace._exporter

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import logging
from typing import Optional, Sequence, Any
from urllib.parse import urlparse

from opentelemetry.util.types import Attributes
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.trace import SpanKind

from azure.monitor.opentelemetry.exporter import _utils
from azure.monitor.opentelemetry.exporter._generated.models import (
    MessageData,
    MonitorBase,
    RemoteDependencyData,
    RequestData,
    TelemetryExceptionData,
    TelemetryExceptionDetails,
    TelemetryItem
)
from azure.monitor.opentelemetry.exporter.export._base import (
    BaseExporter,
    ExportResult,
)

_logger = logging.getLogger(__name__)

__all__ = ["AzureMonitorTraceExporter"]

_STANDARD_OPENTELEMETRY_ATTRIBUTE_PREFIXES = [
    "http.",
    "db.",
    "message.",
    "messaging.",
    "rpc.",
    "enduser.",
    "net.",
    "peer.",
    "exception.",
    "thread.",
    "fass.",
    "code.",
]


[docs]class AzureMonitorTraceExporter(BaseExporter, SpanExporter): """Azure Monitor Trace exporter for OpenTelemetry."""
[docs] def export(self, spans: Sequence[ReadableSpan], **kwargs: Any) -> SpanExportResult: # pylint: disable=unused-argument """Export span data :param spans: Open Telemetry Spans to export. :type spans: Sequence[~opentelemetry.trace.Span] :rtype: ~opentelemetry.sdk.trace.export.SpanExportResult """ envelopes = [] for span in spans: envelopes.append(self._span_to_envelope(span)) envelopes.extend(self._span_events_to_envelopes(span)) try: result = self._transmit(envelopes) self._handle_transmit_from_storage(envelopes, result) return _get_trace_export_result(result) except Exception: # pylint: disable=broad-except _logger.exception("Exception occurred while exporting the data.") return _get_trace_export_result(ExportResult.FAILED_NOT_RETRYABLE)
[docs] def shutdown(self) -> None: """Shuts down the exporter. Called when the SDK is shut down. """ self.storage.close()
def _span_to_envelope(self, span: ReadableSpan) -> TelemetryItem: if not span: return None envelope = _convert_span_to_envelope(span) envelope.instrumentation_key = self._instrumentation_key return envelope def _span_events_to_envelopes(self, span: ReadableSpan) -> Sequence[TelemetryItem]: if not span or len(span.events) == 0: return [] envelopes = _convert_span_events_to_envelopes(span) for envelope in envelopes: envelope.instrumentation_key = self._instrumentation_key return envelopes
[docs] @classmethod def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "AzureMonitorTraceExporter": """ Create an AzureMonitorTraceExporter from a connection string. This is the recommended way of instantation if a connection string is passed in explicitly. If a user wants to use a connection string provided by environment variable, the constructor of the exporter can be called directly. :param str conn_str: The connection string to be used for authentication. :keyword str api_version: The service API version used. Defaults to latest. :returns an instance of ~AzureMonitorTraceExporter """ return cls(connection_string=conn_str, **kwargs)
# pylint: disable=too-many-statements # pylint: disable=too-many-branches # pylint: disable=too-many-locals # pylint: disable=protected-access def _convert_span_to_envelope(span: ReadableSpan) -> TelemetryItem: # Update instrumentation bitmap if span was generated from instrumentation _check_instrumentation_span(span) envelope = _utils._create_telemetry_item(span.start_time) envelope.tags.update(_utils._populate_part_a_fields(span.resource)) envelope.tags["ai.operation.id"] = "{:032x}".format(span.context.trace_id) if SpanAttributes.ENDUSER_ID in span.attributes: envelope.tags["ai.user.id"] = span.attributes[SpanAttributes.ENDUSER_ID] if span.parent and span.parent.span_id: envelope.tags["ai.operation.parentId"] = "{:016x}".format( span.parent.span_id ) # pylint: disable=too-many-nested-blocks if span.kind in (SpanKind.CONSUMER, SpanKind.SERVER): envelope.name = "Microsoft.ApplicationInsights.Request" data = RequestData( name=span.name, id="{:016x}".format(span.context.span_id), duration=_utils.ns_to_duration(span.end_time - span.start_time), response_code="0", success=span.status.is_ok, properties={}, measurements={}, ) envelope.data = MonitorBase(base_data=data, base_type="RequestData") envelope.tags["ai.operation.name"] = span.name if SpanAttributes.NET_PEER_IP in span.attributes: envelope.tags["ai.location.ip"] = span.attributes[SpanAttributes.NET_PEER_IP] if "az.namespace" in span.attributes: # Azure specific resources # Currently only eventhub and servicebus are supported (kind CONSUMER) data.source = _get_azure_sdk_target_source(span.attributes) if span.links: total = 0 for link in span.links: attributes = link.attributes enqueued_time = attributes.get("enqueuedTime") if enqueued_time: difference = (span.start_time / 1000000) - int(enqueued_time) total += difference data.measurements["timeSinceEnqueued"] = max(0, total / len(span.links)) elif SpanAttributes.HTTP_METHOD in span.attributes: # HTTP url = "" path = "" if SpanAttributes.HTTP_USER_AGENT in span.attributes: # TODO: Not exposed in Swagger, need to update def envelope.tags["ai.user.userAgent"] = span.attributes[SpanAttributes.HTTP_USER_AGENT] # http specific logic for ai.location.ip if SpanAttributes.HTTP_CLIENT_IP in span.attributes: envelope.tags["ai.location.ip"] = span.attributes[SpanAttributes.HTTP_CLIENT_IP] # url if SpanAttributes.HTTP_URL in span.attributes: url = span.attributes[SpanAttributes.HTTP_URL] elif SpanAttributes.HTTP_SCHEME in span.attributes and SpanAttributes.HTTP_TARGET in span.attributes: scheme = span.attributes[SpanAttributes.HTTP_SCHEME] http_target = span.attributes[SpanAttributes.HTTP_TARGET] if SpanAttributes.HTTP_HOST in span.attributes: url = "{}://{}{}".format( scheme, span.attributes[SpanAttributes.HTTP_HOST], http_target, ) elif SpanAttributes.NET_HOST_PORT in span.attributes: host_port = span.attributes[SpanAttributes.NET_HOST_PORT] if SpanAttributes.HTTP_SERVER_NAME in span.attributes: server_name = span.attributes[SpanAttributes.HTTP_SERVER_NAME] url = "{}://{}:{}{}".format( scheme, server_name, host_port, http_target, ) elif SpanAttributes.NET_HOST_NAME in span.attributes: host_name = span.attributes[SpanAttributes.NET_HOST_NAME] url = "{}://{}:{}{}".format( scheme, host_name, host_port, http_target, ) data.url = url # Http specific logic for ai.operation.name if SpanAttributes.HTTP_ROUTE in span.attributes: envelope.tags["ai.operation.name"] = "{} {}".format( span.attributes[SpanAttributes.HTTP_METHOD], span.attributes[SpanAttributes.HTTP_ROUTE], ) elif url: try: parse_url = urlparse(url) path = parse_url.path if not path: path = "/" envelope.tags["ai.operation.name"] = "{} {}".format( span.attributes[SpanAttributes.HTTP_METHOD], path, ) except Exception: # pylint: disable=broad-except pass if SpanAttributes.HTTP_STATUS_CODE in span.attributes: status_code = span.attributes[SpanAttributes.HTTP_STATUS_CODE] data.response_code = str(status_code) elif SpanAttributes.MESSAGING_SYSTEM in span.attributes: # Messaging if SpanAttributes.NET_PEER_IP in span.attributes: envelope.tags["ai.location.ip"] = span.attributes[SpanAttributes.NET_PEER_IP] if SpanAttributes.MESSAGING_DESTINATION in span.attributes: if SpanAttributes.NET_PEER_NAME in span.attributes: data.source = "{}/{}".format( span.attributes[SpanAttributes.NET_PEER_NAME], span.attributes[SpanAttributes.MESSAGING_DESTINATION], ) elif SpanAttributes.NET_PEER_IP in span.attributes: data.source = "{}/{}".format( span.attributes[SpanAttributes.NET_PEER_IP], span.attributes[SpanAttributes.MESSAGING_DESTINATION], ) else: data.source = span.attributes[SpanAttributes.MESSAGING_DESTINATION] # Apply truncation # See https://github.com/MohanGsk/ApplicationInsights-Home/tree/master/EndpointSpecs/Schemas/Bond if envelope.tags.get("ai.operation.name"): data.name = envelope.tags["ai.operation.name"][:1024] if data.response_code: data.response_code = data.response_code[:1024] if data.source: data.source = data.source[:1024] if data.url: data.url = data.url[:2048] else: # INTERNAL, CLIENT, PRODUCER envelope.name = "Microsoft.ApplicationInsights.RemoteDependency" # TODO: ai.operation.name for non-server spans data = RemoteDependencyData( name=span.name, id="{:016x}".format(span.context.span_id), result_code="0", duration=_utils.ns_to_duration(span.end_time - span.start_time), success=span.status.is_ok, properties={}, ) envelope.data = MonitorBase( base_data=data, base_type="RemoteDependencyData" ) target = None if SpanAttributes.PEER_SERVICE in span.attributes: target = span.attributes[SpanAttributes.PEER_SERVICE] else: if SpanAttributes.NET_PEER_NAME in span.attributes: target = span.attributes[SpanAttributes.NET_PEER_NAME] elif SpanAttributes.NET_PEER_IP in span.attributes: target = span.attributes[SpanAttributes.NET_PEER_IP] if SpanAttributes.NET_PEER_PORT in span.attributes: port = span.attributes[SpanAttributes.NET_PEER_PORT] # TODO: check default port for rpc # This logic assumes default ports never conflict across dependency types if port != _get_default_port_http(span.attributes.get(SpanAttributes.HTTP_SCHEME)) and \ port != _get_default_port_db(span.attributes.get(SpanAttributes.DB_SYSTEM)): target = "{}:{}".format(target, port) if span.kind is SpanKind.CLIENT: if "az.namespace" in span.attributes: # Azure specific resources # Currently only eventhub and servicebus are supported # https://github.com/Azure/azure-sdk-for-python/issues/9256 data.type = span.attributes["az.namespace"] data.target = _get_azure_sdk_target_source(span.attributes) elif SpanAttributes.HTTP_METHOD in span.attributes: # HTTP data.type = "HTTP" if SpanAttributes.HTTP_USER_AGENT in span.attributes: # TODO: Not exposed in Swagger, need to update def envelope.tags["ai.user.userAgent"] = span.attributes[SpanAttributes.HTTP_USER_AGENT] scheme = span.attributes.get(SpanAttributes.HTTP_SCHEME) # url url = "" if SpanAttributes.HTTP_URL in span.attributes: url = span.attributes[SpanAttributes.HTTP_URL] elif scheme and SpanAttributes.HTTP_TARGET in span.attributes: http_target = span.attributes[SpanAttributes.HTTP_TARGET] if SpanAttributes.HTTP_HOST in span.attributes: url = "{}://{}{}".format( scheme, span.attributes[SpanAttributes.HTTP_HOST], http_target, ) elif SpanAttributes.NET_PEER_PORT in span.attributes: peer_port = span.attributes[SpanAttributes.NET_PEER_PORT] if SpanAttributes.NET_PEER_NAME in span.attributes: peer_name = span.attributes[SpanAttributes.NET_PEER_NAME] url = "{}://{}:{}{}".format( scheme, peer_name, peer_port, http_target, ) elif SpanAttributes.NET_PEER_IP in span.attributes: peer_ip = span.attributes[SpanAttributes.NET_PEER_IP] url = "{}://{}:{}{}".format( scheme, peer_ip, peer_port, http_target, ) target_from_url = "" path = "" if url: try: parse_url = urlparse(url) path = parse_url.path if not path: path = "/" if parse_url.port == _get_default_port_http(scheme): target_from_url = parse_url.hostname else: target_from_url = parse_url.netloc except Exception: # pylint: disable=broad-except pass # http specific logic for name if path: data.name = "{} {}".format( span.attributes[SpanAttributes.HTTP_METHOD], path, ) # http specific logic for target if SpanAttributes.PEER_SERVICE not in span.attributes: if SpanAttributes.HTTP_HOST in span.attributes: host = span.attributes[SpanAttributes.HTTP_HOST] try: # urlparse insists on absolute URLs starting with "//" # This logic assumes host does not include a "//" host_name = urlparse("//" + host) if host_name.port == _get_default_port_http(scheme): target = host_name.hostname else: target = host except Exception: # pylint: disable=broad-except _logger.warning("Error while parsing hostname.") elif target_from_url: target = target_from_url # data is url data.data = url if SpanAttributes.HTTP_STATUS_CODE in span.attributes: status_code = span.attributes[SpanAttributes.HTTP_STATUS_CODE] data.result_code = str(status_code) elif SpanAttributes.DB_SYSTEM in span.attributes: # Database db_system = span.attributes[SpanAttributes.DB_SYSTEM] if db_system == DbSystemValues.MYSQL.value: data.type = "mysql" elif db_system == DbSystemValues.POSTGRESQL.value: data.type = "postgresql" elif db_system == DbSystemValues.MONGODB.value: data.type = "mongodb" elif db_system == DbSystemValues.REDIS.value: data.type = "redis" elif _is_sql_db(db_system): data.type = "SQL" else: data.type = db_system # data is the full statement or operation if SpanAttributes.DB_STATEMENT in span.attributes: data.data = span.attributes[SpanAttributes.DB_STATEMENT] elif SpanAttributes.DB_OPERATION in span.attributes: data.data = span.attributes[SpanAttributes.DB_OPERATION] # db specific logic for target if SpanAttributes.DB_NAME in span.attributes: db_name = span.attributes[SpanAttributes.DB_NAME] if target is None: target = db_name else: target = "{}|{}".format(target, db_name) if target is None: target = db_system elif SpanAttributes.MESSAGING_SYSTEM in span.attributes: # Messaging data.type = span.attributes[SpanAttributes.MESSAGING_SYSTEM] if target is None: if SpanAttributes.MESSAGING_DESTINATION in span.attributes: target = span.attributes[SpanAttributes.MESSAGING_DESTINATION] else: target = span.attributes[SpanAttributes.MESSAGING_SYSTEM] elif SpanAttributes.RPC_SYSTEM in span.attributes: # Rpc data.type = SpanAttributes.RPC_SYSTEM if target is None: target = span.attributes[SpanAttributes.RPC_SYSTEM] else: data.type = "N/A" elif span.kind is SpanKind.PRODUCER: # Messaging # Currently only eventhub and servicebus are supported that produce PRODUCER spans if "az.namespace" in span.attributes: data.type = "Queue Message | {}".format(span.attributes["az.namespace"]) data.target = _get_azure_sdk_target_source(span.attributes) else: data.type = "Queue Message" msg_system = span.attributes.get(SpanAttributes.MESSAGING_SYSTEM) if msg_system: data.type += " | {}".format(msg_system) if target is None: if SpanAttributes.MESSAGING_DESTINATION in span.attributes: target = span.attributes[SpanAttributes.MESSAGING_DESTINATION] else: target = msg_system else: # SpanKind.INTERNAL data.type = "InProc" if "az.namespace" in span.attributes: data.type += " | {}".format(span.attributes["az.namespace"]) # Apply truncation # See https://github.com/MohanGsk/ApplicationInsights-Home/tree/master/EndpointSpecs/Schemas/Bond if data.name: data.name = str(data.name)[:1024] if data.result_code: data.result_code = str(data.result_code)[:1024] if data.data: data.data = str(data.data)[:8192] if data.type: data.type = str(data.type)[:1024] if target: data.target = str(target)[:1024] data.properties = _utils._filter_custom_properties( span.attributes, lambda key, val: not _is_opentelemetry_standard_attribute(key) ) if span.links: # Max length for value is 8192 # Since links are a fixed length (80) in json, max number of links would be 102 links = [] for link in span.links: if len(links) > 102: break operation_id = "{:032x}".format(link.context.trace_id) span_id = "{:016x}".format(link.context.span_id) links.append({"operation_Id": operation_id, "id": span_id}) data.properties["_MS.links"] = json.dumps(links) return envelope # pylint: disable=protected-access def _convert_span_events_to_envelopes(span: ReadableSpan) -> Sequence[TelemetryItem]: envelopes = [] for event in span.events: envelope = _utils._create_telemetry_item(event.timestamp) envelope.tags.update(_utils._populate_part_a_fields(span.resource)) envelope.tags["ai.operation.id"] = "{:032x}".format(span.context.trace_id) if span.context and span.context.span_id: envelope.tags["ai.operation.parentId"] = "{:016x}".format( span.context.span_id ) properties = _utils._filter_custom_properties( event.attributes, lambda key, val: not _is_opentelemetry_standard_attribute(key) ) if event.name == "exception": envelope.name = 'Microsoft.ApplicationInsights.Exception' exc_type = event.attributes.get(SpanAttributes.EXCEPTION_TYPE) exc_message = event.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) if exc_message is None or not exc_message: exc_message = "Exception" stack_trace = event.attributes.get(SpanAttributes.EXCEPTION_STACKTRACE) has_full_stack = stack_trace is not None exc_details = TelemetryExceptionDetails( type_name=str(exc_type)[:1024], message=str(exc_message)[:32768], has_full_stack=has_full_stack, stack=str(stack_trace)[:32768], ) data = TelemetryExceptionData( properties=properties, exceptions=[exc_details], ) # pylint: disable=line-too-long envelope.data = MonitorBase(base_data=data, base_type='ExceptionData') else: envelope.name = 'Microsoft.ApplicationInsights.Message' data = MessageData( message=str(event.name)[:32768], properties=properties, ) envelope.data = MonitorBase(base_data=data, base_type='MessageData') envelopes.append(envelope) return envelopes # pylint:disable=too-many-return-statements def _get_default_port_db(db_system: str) -> int: if db_system == DbSystemValues.POSTGRESQL.value: return 5432 if db_system == DbSystemValues.CASSANDRA.value: return 9042 if db_system in (DbSystemValues.MARIADB.value, DbSystemValues.MYSQL.value): return 3306 if db_system == DbSystemValues.MSSQL.value: return 1433 # TODO: Add in memcached if db_system == "memcached": return 11211 if db_system == DbSystemValues.DB2.value: return 50000 if db_system == DbSystemValues.ORACLE.value: return 1521 if db_system == DbSystemValues.H2.value: return 8082 if db_system == DbSystemValues.DERBY.value: return 1527 if db_system == DbSystemValues.REDIS.value: return 6379 return 0 def _get_default_port_http(scheme: str) -> int: if scheme == "http": return 80 if scheme == "https": return 443 return 0 def _is_sql_db(db_system: str) -> bool: return db_system in ( DbSystemValues.DB2.value, DbSystemValues.DERBY.value, DbSystemValues.MARIADB.value, DbSystemValues.MSSQL.value, DbSystemValues.ORACLE.value, DbSystemValues.SQLITE.value, DbSystemValues.OTHER_SQL.value, # spell-checker:ignore HSQLDB DbSystemValues.HSQLDB.value, DbSystemValues.H2.value, ) def _check_instrumentation_span(span: ReadableSpan) -> None: if span.instrumentation_scope is None: return # All instrumentation scope names from OpenTelemetry instrumentations have # `opentelemetry.instrumentation.` as a prefix if span.instrumentation_scope.name.startswith("opentelemetry.instrumentation."): # The string after the prefix is the name of the instrumentation name = span.instrumentation_scope.name.split("opentelemetry.instrumentation.", 1)[1] # Update the bit map to indicate instrumentation is being used _utils.add_instrumentation(name) def _is_opentelemetry_standard_attribute(key: str) -> bool: for prefix in _STANDARD_OPENTELEMETRY_ATTRIBUTE_PREFIXES: if key.startswith(prefix): return True return False def _get_azure_sdk_target_source(attributes: Attributes) -> Optional[str]: # Currently logic only works for ServiceBus and EventHub peer_address = attributes.get("peer.address") destination = attributes.get("message_bus.destination") if peer_address and destination: return peer_address + "/" + destination def _get_trace_export_result(result: ExportResult) -> SpanExportResult: if result == ExportResult.SUCCESS: return SpanExportResult.SUCCESS if result in ( ExportResult.FAILED_RETRYABLE, ExportResult.FAILED_NOT_RETRYABLE, ): return SpanExportResult.FAILURE return None