Source code for azure.eventhub._configuration

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Optional, Any, Dict

try:
    from urlparse import urlparse
except ImportError:
    from urllib.parse import urlparse

from uamqp.constants import TransportType, DEFAULT_AMQPS_PORT, DEFAULT_AMQP_WSS_PORT

from ._common import DictMixin
from ._utils import validate_producer_client_partition_config



class Configuration(object):  # pylint:disable=too-many-instance-attributes
    def __init__(self, **kwargs):
        self.user_agent = kwargs.get("user_agent")  # type: Optional[str]
        self.retry_total = kwargs.get("retry_total", 3)  # type: int
        self.max_retries = self.retry_total  # type: int
        self.backoff_factor = kwargs.get("retry_backoff_factor", 0.8)  # type: float
        self.backoff_max = kwargs.get("retry_backoff_max", 120)  # type: int
        self.network_tracing = kwargs.get("network_tracing", False)  # type: bool
        self.http_proxy = kwargs.get("http_proxy")  # type: Optional[Dict[str, Any]]
        self.transport_type = (
            TransportType.AmqpOverWebsocket
            if self.http_proxy
            else kwargs.get("transport_type", TransportType.Amqp)
        )
        self.auth_timeout = kwargs.get("auth_timeout", 60)  # type: int
        self.prefetch = kwargs.get("prefetch", 300)  # type: int
        self.max_batch_size = kwargs.get("max_batch_size", self.prefetch)  # type: int
        self.receive_timeout = kwargs.get("receive_timeout", 0)  # type: int
        self.send_timeout = kwargs.get("send_timeout", 60)  # type: int
        self.custom_endpoint_address = kwargs.get("custom_endpoint_address")  # type: Optional[str]
        self.connection_verify = kwargs.get("connection_verify")  # type: Optional[str]
        self.connection_port = DEFAULT_AMQPS_PORT
        self.custom_endpoint_hostname = None
        self.enable_idempotent_partitions = kwargs.get("enable_idempotent_partitions", False)  # type: bool

        if self.http_proxy or self.transport_type == TransportType.AmqpOverWebsocket:
            self.transport_type = TransportType.AmqpOverWebsocket
            self.connection_port = DEFAULT_AMQP_WSS_PORT

        # custom end point
        if self.custom_endpoint_address:
            # if the custom_endpoint_address doesn't include the schema,
            # we prepend a default one to make urlparse work
            if self.custom_endpoint_address.find("//") == -1:
                self.custom_endpoint_address = "sb://" + self.custom_endpoint_address
            endpoint = urlparse(self.custom_endpoint_address)
            self.transport_type = TransportType.AmqpOverWebsocket
            self.custom_endpoint_hostname = endpoint.hostname
            # in case proxy and custom endpoint are both provided, we default port to 443 if it's not provided
            self.connection_port = endpoint.port or DEFAULT_AMQP_WSS_PORT


[docs]class PartitionPublishingConfiguration(DictMixin): """ The set of configurations that can be specified for an `EventHubProducerClient` to influence its behavior when publishing directly to an Event Hub partition. :ivar int producer_group_id: The identifier of the producer group that this producer is associated with when publishing to the associated partition. :ivar int owner_level: The owner level indicates that publishing is intended to be performed exclusively for events in the requested partition in the context of the associated producer group. :ivar int starting_sequence_number: The starting number that should be used for the automatic sequencing of events for the associated partition, when published by this producer. :keyword int producer_group_id: The identifier of the producer group that this producer is associated with when publishing to the associated partition. The producer group is only recognized and relevant when certain features of the producer are enabled. For example, it is used by idempotent publishing. The producer group id should be in the range from 0 to max signed long (2^63 - 1) as required by the service. :keyword int owner_level: The owner level indicates that publishing is intended to be performed exclusively for events in the requested partition in the context of the associated producer group. To do so, publishing will attempt to assert ownership over the partition; in the case where more than one publisher in the producer group attempts to assert ownership for the same partition, the one having a larger owner_level value will "win". The owner level is only recognized and relevant when certain features of the producer are enabled. For example, it is used by idempotent publishing. The owner level should be in the range from 0 to max signed short (2^16 - 1) as required by the service. :keyword int starting_sequence_number: The starting number that should be used for the automatic sequencing of events for the associated partition, when published by this producer. The starting sequence number is only recognized and relevant when certain features of the producer are enabled. For example, it is used by idempotent publishing. The starting sequence number should be in the range from 0 to max signed integer (2^31 - 1) as required by the service. """ def __init__(self, **kwargs): validate_producer_client_partition_config(kwargs) self.owner_level = kwargs.get("owner_level") # type: Optional[int] self.producer_group_id = kwargs.get("producer_group_id") # type: Optional[int] self.starting_sequence_number = kwargs.get("starting_sequence_number") # type: Optional[int]