Source code for azure.servicebus.management._models

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# pylint:disable=protected-access
# pylint:disable=too-many-lines
from collections import OrderedDict
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Type, Dict, Any, Union, Optional, List, Tuple, TYPE_CHECKING, cast
from ._generated._serialization import Model

from ._generated.models import (
    QueueDescription as InternalQueueDescription,
    TopicDescription as InternalTopicDescription,
    SubscriptionDescription as InternalSubscriptionDescription,
    RuleDescription as InternalRuleDescription,
    SqlRuleAction as InternalSqlRuleAction,
    EmptyRuleAction as InternalEmptyRuleAction,
    CorrelationFilter as InternalCorrelationFilter,
    NamespaceProperties as InternalNamespaceProperties,
    SqlFilter as InternalSqlFilter,
    TrueFilter as InternalTrueFilter,
    FalseFilter as InternalFalseFilter,
    KeyValue,
    KeyObjectValue,
    AuthorizationRule as InternalAuthorizationRule,
)
from ._model_workaround import adjust_attribute_map, avoid_timedelta_overflow
from ._constants import RULE_SQL_COMPATIBILITY_LEVEL
from ._utils import _normalize_entity_path_to_full_path_if_needed

if TYPE_CHECKING:
    from ._generated.models import (
        MessagingSku,
        NamespaceType,
        EntityAvailabilityStatus,
        EntityStatus,
        AccessRights,
        MessageCountDetails,
    )

adjust_attribute_map()


class DictMixin(object):
    def __setitem__(self, key: str, item: Any) -> None:
        self.__dict__[key] = item

    def __getitem__(self, key: str) -> Any:
        return self.__dict__[key]

    def __repr__(self) -> str:
        return str(self)

    def __len__(self) -> int:
        return len(self.keys())

    def __delitem__(self, key: str) -> None:
        self.__dict__[key] = None

    def __eq__(self, other: Any) -> bool:
        """Compare objects by comparing all attributes.
        :param any other: The object to compare with
        :return: `True` if `self` and `other` are equal, `False` otherwise.
        :rtype: bool
        """
        if isinstance(other, self.__class__):
            return self.__dict__ == other.__dict__
        return False

    def __ne__(self, other: Any) -> bool:
        """Compare objects by comparing all attributes.
        :param any other: The object to compare with
        :return: `True` if `self` and `other` are not equal, `False` otherwise.
        :rtype: bool
        """
        return not self.__eq__(other)

    def __str__(self) -> str:
        return str({k: v for k, v in self.__dict__.items() if not k.startswith("_")})

    def __contains__(self, key: str) -> bool:
        return key in self.__dict__

    def has_key(self, k: str) -> bool:
        return k in self.__dict__

    def update(self, *args: Any, **kwargs: Any) -> None:
        return self.__dict__.update(*args, **kwargs)

    def keys(self) -> List[str]:
        return [k for k in self.__dict__ if not k.startswith("_")]

    def values(self) -> List[Any]:
        return [v for k, v in self.__dict__.items() if not k.startswith("_")]

    def items(self) -> List[Tuple[str, Any]]:
        return [(k, v) for k, v in self.__dict__.items() if not k.startswith("_")]

    def get(
        self,
        key: str,
        default: Optional[Any] = None
    ) -> Any:
        if key in self.__dict__:
            return self.__dict__[key]
        return default


[docs]class NamespaceProperties(DictMixin): """The metadata related to a Service Bus namespace. **Please use the `get_namespace_properties` on the ServiceBusAdministrationClient to get a `NamespaceProperties` instance instead of instantiating a `NamespaceProperties` object directly.** :param name: Name of the namespace. :type name: str :keyword alias: Alias for the geo-disaster recovery Service Bus namespace. :paramtype alias: str :keyword created_at_utc: The exact time the namespace was created. :paramtype created_at_utc: ~datetime.datetime :keyword messaging_sku: The SKU for the messaging entity. Possible values include: "Basic", "Standard", "Premium". :paramtype messaging_sku: str or ~azure.servicebus.management.MessagingSku :keyword messaging_units: The number of messaging units allocated to the namespace. :paramtype messaging_units: int :keyword modified_at_utc: The exact time the namespace was last modified. :paramtype modified_at_utc: ~datetime.datetime :keyword namespace_type: The type of entities the namespace can contain. Known values are: "Messaging", "NotificationHub", "Mixed", "EventHub", and "Relay". :paramtype namespace_type: str or ~azure.servicebus.management.NamespaceType :ivar alias: Alias for the geo-disaster recovery Service Bus namespace. :vartype alias: str :ivar created_at_utc: The exact time the namespace was created. :vartype created_at_utc: ~datetime.datetime :ivar messaging_sku: The SKU for the messaging entity. Possible values include: "Basic", "Standard", "Premium". :vartype messaging_sku: str or ~azure.servicebus.management.MessagingSku :ivar messaging_units: The number of messaging units allocated to the namespace. :vartype messaging_units: int :ivar modified_at_utc: The exact time the namespace was last modified. :vartype modified_at_utc: ~datetime.datetime :ivar name: Name of the namespace. :vartype name: str :ivar namespace_type: The type of entities the namespace can contain. Known values are: "Messaging", "NotificationHub", "Mixed", "EventHub", and "Relay". :vartype namespace_type: str or ~azure.servicebus.management._generated.models.NamespaceType """ def __init__( self, name: str, *, alias: Optional[str], created_at_utc: Optional[datetime], messaging_sku: Optional[Union[str, "MessagingSku"]], messaging_units: Optional[int], modified_at_utc: Optional[datetime], namespace_type: Optional[Union[str, "NamespaceType"]], ) -> None: self.name = name self.alias = alias self.created_at_utc = created_at_utc self.messaging_sku = messaging_sku self.messaging_units = messaging_units self.modified_at_utc = modified_at_utc self.namespace_type = namespace_type @classmethod def _from_internal_entity( cls, name: str, internal_entity: InternalNamespaceProperties ) -> "NamespaceProperties": namespace_properties = cls( name, alias=internal_entity.alias, created_at_utc=internal_entity.created_time, messaging_sku=internal_entity.messaging_sku, messaging_units=internal_entity.messaging_units, modified_at_utc=internal_entity.modified_time, namespace_type=internal_entity.namespace_type, ) return namespace_properties def _to_internal_entity(self) -> InternalNamespaceProperties: internal_entity = InternalNamespaceProperties() internal_entity.alias = self.alias internal_entity.created_time = self.created_at_utc internal_entity.messaging_sku = self.messaging_sku internal_entity.messaging_units = self.messaging_units internal_entity.modified_time = self.modified_at_utc internal_entity.namespace_type = self.namespace_type return internal_entity
[docs]class QueueProperties(DictMixin): # pylint:disable=too-many-instance-attributes """Properties of a Service Bus queue resource. **Please use `get_queue`, `create_queue`, or `list_queues` on the ServiceBusAdministrationClient to get a `QueueProperties` instance instead of instantiating a `QueueProperties` object directly.** :param name: Name of the queue. :type name: str :keyword authorization_rules: Authorization rules for resource. :paramtype authorization_rules: list[~azure.servicebus.management.AuthorizationRule] or None :keyword auto_delete_on_idle: ISO 8601 timeSpan idle interval after which the queue is automatically deleted. The minimum duration is 5 minutes. :paramtype auto_delete_on_idle: ~datetime.timedelta or str or None :keyword dead_lettering_on_message_expiration: A value that indicates whether this queue has dead letter support when a message expires. :paramtype dead_lettering_on_message_expiration: bool or None :keyword default_message_time_to_live: ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. :paramtype default_message_time_to_live: ~datetime.timedelta or str or None :keyword duplicate_detection_history_time_window: ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes. :paramtype duplicate_detection_history_time_window: ~datetime.timedelta or str or None :keyword availability_status: Availibility status of the entity. Possible values include: "Available", "Limited", "Renaming", "Restoring", "Unknown". :paramtype availability_status: str or None or ~azure.servicebus.management.EntityAvailabilityStatus :keyword enable_batched_operations: Value that indicates whether server-side batched operations are enabled. :paramtype enable_batched_operations: bool or None :keyword enable_express: A value that indicates whether Express Entities are enabled. An express queue holds a message in memory temporarily before writing it to persistent storage. :paramtype enable_express: bool or None :keyword enable_partitioning: A value that indicates whether the queue is to be partitioned across multiple message brokers. :paramtype enable_partitioning: bool or None :keyword lock_duration: ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute. :paramtype lock_duration: ~datetime.timedelt or Nonea :keyword max_delivery_count: The maximum delivery count. A message is automatically deadlettered after this number of deliveries. Default value is 10. :paramtype max_delivery_count: int or None :keyword max_size_in_megabytes: The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. :paramtype max_size_in_megabytes: int or None :keyword requires_duplicate_detection: A value indicating if this queue requires duplicate detection. :paramtype requires_duplicate_detection: bool or None :keyword requires_session: A value that indicates whether the queue supports the concept of sessions. :paramtype requires_session: bool or None :keyword status: Status of a Service Bus resource. Possible values include: "Active", "Creating", "Deleting", "Disabled", "ReceiveDisabled", "Renaming", "Restoring", "SendDisabled", "Unknown". :paramtype status: str or ~azure.servicebus.management.EntityStatus :keyword forward_to: The name of the recipient entity to which all the messages sent to the queue are forwarded to. :paramtype forward_to: str or None :keyword user_metadata: Custom metdata that user can associate with the description. Max length is 1024 chars. :paramtype user_metadata: str or None :keyword forward_dead_lettered_messages_to: The name of the recipient entity to which all the dead-lettered messages of this subscription are forwarded to. :paramtype forward_dead_lettered_messages_to: str or None :keyword max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that can be accepted by the queue. This feature is only available when using a Premium namespace and Service Bus API version "2021-05" or higher. :paramtype max_message_size_in_kilobytes: int or None :ivar name: Name of the queue. :vartype name: str or None :ivar authorization_rules: Authorization rules for resource. :vartype authorization_rules: list[~azure.servicebus.management.AuthorizationRule] :ivar auto_delete_on_idle: ISO 8601 timeSpan idle interval after which the queue is automatically deleted. The minimum duration is 5 minutes. :vartype auto_delete_on_idle: ~datetime.timedelta or str or None :ivar dead_lettering_on_message_expiration: A value that indicates whether this queue has dead letter support when a message expires. :vartype dead_lettering_on_message_expiration: bool or None :ivar default_message_time_to_live: ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. :vartype default_message_time_to_live: ~datetime.timedelta or str or None :ivar duplicate_detection_history_time_window: ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes. :vartype duplicate_detection_history_time_window: ~datetime.timedelta or str or None :ivar availability_status: Availibility status of the entity. Possible values include: "Available", "Limited", "Renaming", "Restoring", "Unknown". :vartype availability_status: str or None or ~azure.servicebus.management.EntityAvailabilityStatus :ivar enable_batched_operations: Value that indicates whether server-side batched operations are enabled. :vartype enable_batched_operations: bool or None :ivar enable_express: A value that indicates whether Express Entities are enabled. An express queue holds a message in memory temporarily before writing it to persistent storage. :vartype enable_express: bool or None :ivar enable_partitioning: A value that indicates whether the queue is to be partitioned across multiple message brokers. :vartype enable_partitioning: bool or None :ivar lock_duration: ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute. :vartype lock_duration: ~datetime.timedelta or str or None :ivar max_delivery_count: The maximum delivery count. A message is automatically deadlettered after this number of deliveries. Default value is 10. :vartype max_delivery_count: int or None :ivar max_size_in_megabytes: The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. :vartype max_size_in_megabytes: int or None :ivar requires_duplicate_detection: A value indicating if this queue requires duplicate detection. :vartype requires_duplicate_detection: bool or None :ivar requires_session: A value that indicates whether the queue supports the concept of sessions. :vartype requires_session: bool or None :ivar status: Status of a Service Bus resource. Possible values include: "Active", "Creating", "Deleting", "Disabled", "ReceiveDisabled", "Renaming", "Restoring", "SendDisabled", "Unknown". :vartype status: str or ~azure.servicebus.management.EntityStatus or None :ivar forward_to: The name of the recipient entity to which all the messages sent to the queue are forwarded to. :vartype forward_to: str or None :ivar user_metadata: Custom metdata that user can associate with the description. Max length is 1024 chars. :vartype user_metadata: str or None :ivar forward_dead_lettered_messages_to: The name of the recipient entity to which all the dead-lettered messages of this subscription are forwarded to. :vartype forward_dead_lettered_messages_to: str or None :ivar max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that can be accepted by the queue. This feature is only available when using a Premium namespace and Service Bus API version "2021-05" or higher. :vartype max_message_size_in_kilobytes: int or None """ def __init__( self, name: str, *, authorization_rules: Optional[List["AuthorizationRule"]], auto_delete_on_idle: Optional[Union[timedelta, str]], dead_lettering_on_message_expiration: Optional[bool], default_message_time_to_live: Optional[Union[timedelta, str]], duplicate_detection_history_time_window: Optional[Union[timedelta, str]], availability_status: Optional[Union[str, "EntityAvailabilityStatus"]], enable_batched_operations: Optional[bool], enable_express: Optional[bool], enable_partitioning: Optional[bool], lock_duration: Optional[Union[timedelta, str]], max_delivery_count: Optional[int], max_size_in_megabytes: Optional[int], requires_duplicate_detection: Optional[bool], requires_session: Optional[bool], status: Optional[Union[str, "EntityStatus"]], forward_to: Optional[str], user_metadata: Optional[str], forward_dead_lettered_messages_to: Optional[str], max_message_size_in_kilobytes: Optional[int], ) -> None: self.name = name self._internal_qd: InternalQueueDescription self.authorization_rules = authorization_rules self.auto_delete_on_idle = auto_delete_on_idle self.dead_lettering_on_message_expiration = dead_lettering_on_message_expiration self.default_message_time_to_live = default_message_time_to_live self.duplicate_detection_history_time_window = duplicate_detection_history_time_window self.availability_status = availability_status self.enable_batched_operations = enable_batched_operations self.enable_express = enable_express self.enable_partitioning = enable_partitioning self.lock_duration = lock_duration self.max_delivery_count = max_delivery_count self.max_size_in_megabytes = max_size_in_megabytes self.requires_duplicate_detection = requires_duplicate_detection self.requires_session = requires_session self.status = status self.forward_to = forward_to self.user_metadata = user_metadata self.forward_dead_lettered_messages_to = forward_dead_lettered_messages_to self.max_message_size_in_kilobytes = max_message_size_in_kilobytes @classmethod def _from_internal_entity( cls, name: str, internal_qd: InternalQueueDescription ) -> "QueueProperties": qd = cls( name, authorization_rules=[ AuthorizationRule._from_internal_entity(r) for r in internal_qd.authorization_rules ] if internal_qd.authorization_rules else None, auto_delete_on_idle=internal_qd.auto_delete_on_idle, dead_lettering_on_message_expiration=internal_qd.dead_lettering_on_message_expiration, default_message_time_to_live=internal_qd.default_message_time_to_live, duplicate_detection_history_time_window=internal_qd.duplicate_detection_history_time_window, availability_status=internal_qd.entity_availability_status, enable_batched_operations=internal_qd.enable_batched_operations, enable_express=internal_qd.enable_express, enable_partitioning=internal_qd.enable_partitioning, lock_duration=internal_qd.lock_duration, max_delivery_count=internal_qd.max_delivery_count, max_size_in_megabytes=internal_qd.max_size_in_megabytes, requires_duplicate_detection=internal_qd.requires_duplicate_detection, requires_session=internal_qd.requires_session, status=internal_qd.status, forward_to=internal_qd.forward_to, forward_dead_lettered_messages_to=internal_qd.forward_dead_lettered_messages_to, user_metadata=internal_qd.user_metadata, max_message_size_in_kilobytes=internal_qd.max_message_size_in_kilobytes, ) qd._internal_qd = deepcopy(internal_qd) # pylint:disable=protected-access return qd def _to_internal_entity( self, fully_qualified_namespace: str, kwargs: Optional[Dict] = None ) -> InternalQueueDescription: kwargs = kwargs or {} if not hasattr(self, "_internal_qd"): internal_qd = InternalQueueDescription() self._internal_qd = internal_qd authorization_rules = kwargs.pop( "authorization_rules", self.authorization_rules ) self._internal_qd.authorization_rules = ( [r._to_internal_entity() for r in authorization_rules] if authorization_rules else authorization_rules ) self._internal_qd.auto_delete_on_idle = avoid_timedelta_overflow( # type: ignore kwargs.pop("auto_delete_on_idle", self.auto_delete_on_idle) ) self._internal_qd.dead_lettering_on_message_expiration = kwargs.pop( "dead_lettering_on_message_expiration", self.dead_lettering_on_message_expiration, ) self._internal_qd.default_message_time_to_live = avoid_timedelta_overflow( # type: ignore kwargs.pop( "default_message_time_to_live", self.default_message_time_to_live ) ) self._internal_qd.duplicate_detection_history_time_window = kwargs.pop( "duplicate_detection_history_time_window", self.duplicate_detection_history_time_window, ) self._internal_qd.entity_availability_status = kwargs.pop( "availability_status", self.availability_status ) self._internal_qd.enable_batched_operations = kwargs.pop( "enable_batched_operations", self.enable_batched_operations ) self._internal_qd.enable_express = kwargs.pop( "enable_express", self.enable_express ) self._internal_qd.enable_partitioning = kwargs.pop( "enable_partitioning", self.enable_partitioning ) self._internal_qd.lock_duration = kwargs.pop( "lock_duration", self.lock_duration ) self._internal_qd.max_delivery_count = kwargs.pop( "max_delivery_count", self.max_delivery_count ) self._internal_qd.max_size_in_megabytes = kwargs.pop( "max_size_in_megabytes", self.max_size_in_megabytes ) self._internal_qd.requires_duplicate_detection = kwargs.pop( "requires_duplicate_detection", self.requires_duplicate_detection ) self._internal_qd.requires_session = kwargs.pop( "requires_session", self.requires_session ) self._internal_qd.status = kwargs.pop("status", self.status) forward_to = kwargs.pop("forward_to", self.forward_to) self._internal_qd.forward_to = _normalize_entity_path_to_full_path_if_needed( forward_to, fully_qualified_namespace ) forward_dead_lettered_messages_to = kwargs.pop( "forward_dead_lettered_messages_to", self.forward_dead_lettered_messages_to ) self._internal_qd.forward_dead_lettered_messages_to = ( _normalize_entity_path_to_full_path_if_needed( forward_dead_lettered_messages_to, fully_qualified_namespace ) ) self._internal_qd.user_metadata = kwargs.pop( "user_metadata", self.user_metadata ) self._internal_qd.max_message_size_in_kilobytes = kwargs.pop( "max_message_size_in_kilobytes", self.max_message_size_in_kilobytes ) return self._internal_qd
[docs]class QueueRuntimeProperties(object): """Service Bus queue runtime properties.""" def __init__(self) -> None: self._name: str self._internal_qr: InternalQueueDescription @classmethod def _from_internal_entity( cls, name: str, internal_qr: InternalQueueDescription ) -> "QueueRuntimeProperties": qr = cls() qr._name = name qr._internal_qr = deepcopy(internal_qr) # pylint:disable=protected-access return qr @property def name(self) -> str: """Name of the queue. :rtype: str """ return self._name @property def accessed_at_utc(self) -> Optional[datetime]: """Last time a message was sent, or the last time there was a receive request to this queue. :rtype: ~datetime.datetime or None """ return self._internal_qr.accessed_at @property def created_at_utc(self) -> Optional[datetime]: """The exact time the queue was created. :rtype: ~datetime.datetime or None """ return self._internal_qr.created_at @property def updated_at_utc(self) -> Optional[datetime]: """The exact the entity was updated. :rtype: ~datetime.datetime or None """ return self._internal_qr.updated_at @property def size_in_bytes(self) -> Optional[int]: """The size of the queue, in bytes. :rtype: int or None """ return self._internal_qr.size_in_bytes @property def total_message_count(self) -> Optional[int]: """Total number of messages. :rtype: int or None """ return self._internal_qr.message_count @property def active_message_count(self) -> Optional[int]: """Number of active messages in the queue, topic, or subscription. :rtype: int or None """ return cast("MessageCountDetails", self._internal_qr.message_count_details).active_message_count @property def dead_letter_message_count(self) -> Optional[int]: """Number of messages that are dead lettered. :rtype: int or None """ return cast("MessageCountDetails", self._internal_qr.message_count_details).dead_letter_message_count @property def scheduled_message_count(self) -> Optional[int]: """Number of scheduled messages. :rtype: int or None """ return cast("MessageCountDetails", self._internal_qr.message_count_details).scheduled_message_count @property def transfer_dead_letter_message_count(self) -> Optional[int]: """Number of messages transferred into dead letters. :rtype: int or None """ return ( cast("MessageCountDetails", self._internal_qr.message_count_details).transfer_dead_letter_message_count ) @property def transfer_message_count(self) -> Optional[int]: """Number of messages transferred to another queue, topic, or subscription. :rtype: int or None """ return cast("MessageCountDetails", self._internal_qr.message_count_details).transfer_message_count
[docs]class TopicProperties(DictMixin): # pylint:disable=too-many-instance-attributes """Properties of a Service Bus topic resource. **Please use `get_topic`, `create_topic`, or `list_topics` on the ServiceBusAdministrationClient to get a `TopicProperties` instance instead of instantiating a `TopicProperties` object directly.** :ivar name: Name of the topic. :type name: str :ivar default_message_time_to_live: ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. :type default_message_time_to_live: ~datetime.timedelta or str or None :ivar max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of memory allocated for the topic. :type max_size_in_megabytes: float or None :ivar requires_duplicate_detection: A value indicating if this topic requires duplicate detection. :type requires_duplicate_detection: bool or None :ivar duplicate_detection_history_time_window: ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes. :type duplicate_detection_history_time_window: ~datetime.timedelta or str or None :ivar enable_batched_operations: Value that indicates whether server-side batched operations are enabled. :type enable_batched_operations: bool or None :ivar size_in_bytes: The size of the topic, in bytes. :type size_in_bytes: int or None :ivar filtering_messages_before_publishing: Filter messages before publishing. :type filtering_messages_before_publishing: bool or None :ivar authorization_rules: Authorization rules for resource. :type authorization_rules: list[~azure.servicebus.management.AuthorizationRule] or None :ivar status: Status of a Service Bus resource. Possible values include: "Active", "Creating", "Deleting", "Disabled", "ReceiveDisabled", "Renaming", "Restoring", "SendDisabled", "Unknown". :type status: str or ~azure.servicebus.management.EntityStatus or None :ivar support_ordering: A value that indicates whether the topic supports ordering. :type support_ordering: bool or None :ivar auto_delete_on_idle: ISO 8601 timeSpan idle interval after which the topic is automatically deleted. The minimum duration is 5 minutes. :type auto_delete_on_idle: ~datetime.timedelta or str or None :ivar enable_partitioning: A value that indicates whether the topic is to be partitioned across multiple message brokers. :type enable_partitioning: bool or None :ivar availability_status: Availability status of the entity. Possible values include: "Available", "Limited", "Renaming", "Restoring", "Unknown". :type availability_status: str or None or ~azure.servicebus.management.EntityAvailabilityStatus :ivar enable_express: A value that indicates whether Express Entities are enabled. An express queue holds a message in memory temporarily before writing it to persistent storage. :type enable_express: bool or None :ivar user_metadata: Metadata associated with the topic. :type user_metadata: str or None :ivar max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that can be accepted by the topic. This feature is only available when using a Premium namespace and Service Bus API version "2021-05" or higher. :type max_message_size_in_kilobytes: int or None """ def __init__( self, name: str, *, default_message_time_to_live: Optional[Union[timedelta, str]], max_size_in_megabytes: Optional[int], requires_duplicate_detection: Optional[bool], duplicate_detection_history_time_window: Optional[Union[timedelta, str]], enable_batched_operations: Optional[bool], size_in_bytes: Optional[int], filtering_messages_before_publishing: Optional[bool], authorization_rules: Optional[List["AuthorizationRule"]], status: Optional[Union[str, "EntityStatus"]], support_ordering: Optional[bool], auto_delete_on_idle: Optional[Union[timedelta, str]], enable_partitioning: Optional[bool], availability_status: Optional[Union[str, "EntityAvailabilityStatus"]], enable_express: Optional[bool], user_metadata: Optional[str], max_message_size_in_kilobytes: Optional[int], ) -> None: self.name = name self._internal_td: InternalTopicDescription self.default_message_time_to_live = default_message_time_to_live self.max_size_in_megabytes = max_size_in_megabytes self.requires_duplicate_detection = requires_duplicate_detection self.duplicate_detection_history_time_window = duplicate_detection_history_time_window self.enable_batched_operations = enable_batched_operations self.size_in_bytes = size_in_bytes self.filtering_messages_before_publishing = filtering_messages_before_publishing self.authorization_rules = authorization_rules self.status = status self.support_ordering = support_ordering self.auto_delete_on_idle = auto_delete_on_idle self.enable_partitioning = enable_partitioning self.availability_status = availability_status self.enable_express = enable_express self.user_metadata = user_metadata self.max_message_size_in_kilobytes = max_message_size_in_kilobytes @classmethod def _from_internal_entity( cls, name: str, internal_td: InternalTopicDescription ) -> "TopicProperties": td = cls( name, default_message_time_to_live=internal_td.default_message_time_to_live, max_size_in_megabytes=internal_td.max_size_in_megabytes, requires_duplicate_detection=internal_td.requires_duplicate_detection, duplicate_detection_history_time_window=internal_td.duplicate_detection_history_time_window, enable_batched_operations=internal_td.enable_batched_operations, size_in_bytes=internal_td.size_in_bytes, filtering_messages_before_publishing=internal_td.filtering_messages_before_publishing, authorization_rules=[ AuthorizationRule._from_internal_entity(r) for r in internal_td.authorization_rules ] if internal_td.authorization_rules else None, status=internal_td.status, support_ordering=internal_td.support_ordering, auto_delete_on_idle=internal_td.auto_delete_on_idle, enable_partitioning=internal_td.enable_partitioning, availability_status=internal_td.entity_availability_status, enable_express=internal_td.enable_express, user_metadata=internal_td.user_metadata, max_message_size_in_kilobytes=internal_td.max_message_size_in_kilobytes, ) td._internal_td = deepcopy(internal_td) return td def _to_internal_entity(self, kwargs: Optional[Dict] = None) -> InternalTopicDescription: kwargs = kwargs or {} if not hasattr(self, "_internal_td"): self._internal_td = InternalTopicDescription() self._internal_td.default_message_time_to_live = avoid_timedelta_overflow( # type: ignore kwargs.pop( "default_message_time_to_live", self.default_message_time_to_live ) ) self._internal_td.max_size_in_megabytes = kwargs.pop( "max_size_in_megabytes", self.max_size_in_megabytes ) self._internal_td.requires_duplicate_detection = kwargs.pop( "requires_duplicate_detection", self.requires_duplicate_detection ) self._internal_td.duplicate_detection_history_time_window = kwargs.pop( "duplicate_detection_history_time_window", self.duplicate_detection_history_time_window, ) self._internal_td.enable_batched_operations = kwargs.pop( "enable_batched_operations", self.enable_batched_operations ) self._internal_td.size_in_bytes = kwargs.pop( "size_in_bytes", self.size_in_bytes ) authorization_rules = kwargs.pop( "authorization_rules", self.authorization_rules ) self._internal_td.authorization_rules = ( [r._to_internal_entity() for r in authorization_rules] if authorization_rules else authorization_rules ) self._internal_td.status = kwargs.pop("status", self.status) self._internal_td.support_ordering = kwargs.pop( "support_ordering", self.support_ordering ) self._internal_td.auto_delete_on_idle = avoid_timedelta_overflow( # type: ignore kwargs.pop("auto_delete_on_idle", self.auto_delete_on_idle) ) self._internal_td.enable_partitioning = kwargs.pop( "enable_partitioning", self.enable_partitioning ) self._internal_td.entity_availability_status = kwargs.pop( "availability_status", self.availability_status ) self._internal_td.enable_express = kwargs.pop( "enable_express", self.enable_express ) self._internal_td.user_metadata = kwargs.pop( "user_metadata", self.user_metadata ) self._internal_td.max_message_size_in_kilobytes = kwargs.pop( "max_message_size_in_kilobytes", self.max_message_size_in_kilobytes ) return self._internal_td
[docs]class TopicRuntimeProperties(object): """Runtime properties of a Service Bus topic resource.""" def __init__(self) -> None: self._name: str self._internal_td: InternalTopicDescription @classmethod def _from_internal_entity( cls, name: str, internal_td: InternalTopicDescription ) -> "TopicRuntimeProperties": qd = cls() qd._name = name qd._internal_td = internal_td return qd @property def name(self) -> str: """The name of the topic. :rtype: str """ return self._name @property def accessed_at_utc(self) -> Optional[datetime]: """Last time a message was sent, or the last time there was a receive request :rtype: ~datetime.datetime or None """ return self._internal_td.accessed_at @property def created_at_utc(self) -> Optional[datetime]: """The exact time the queue was created. :rtype: ~datetime.datetime or None """ return self._internal_td.created_at @property def updated_at_utc(self) -> Optional[datetime]: """The exact time the entity was updated. :rtype: ~datetime.datetime or None """ return self._internal_td.updated_at @property def size_in_bytes(self) -> Optional[int]: """The current size of the entity in bytes. :rtype: int or None """ return self._internal_td.size_in_bytes @property def subscription_count(self) -> Optional[int]: """The number of subscriptions in the topic. :rtype: int or None """ return self._internal_td.subscription_count @property def scheduled_message_count(self) -> Optional[int]: """Number of scheduled messages. :rtype: int or None """ return cast("MessageCountDetails", self._internal_td.message_count_details).scheduled_message_count
[docs]class SubscriptionProperties(DictMixin): # pylint:disable=too-many-instance-attributes """Properties of a Service Bus topic subscription resource. **Please use `get_subscription`, `create_subscription`, or `list_subscriptions` on the ServiceBusAdministrationClient to get a `SubscriptionProperties` instance instead of instantiating a `SubscriptionProperties` object directly.** :param name: Name of the subscription. :type name: str :keyword lock_duration: ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute. :paramtype lock_duration: ~datetime.timedelta or str or None :keyword requires_session: A value that indicates whether the queue supports the concept of sessions. :paramtype requires_session: bool or None :keyword default_message_time_to_live: ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. :paramtype default_message_time_to_live: ~datetime.timedelta or str or None :keyword dead_lettering_on_message_expiration: A value that indicates whether this subscription has dead letter support when a message expires. :paramtype dead_lettering_on_message_expiration: bool or None :keyword dead_lettering_on_filter_evaluation_exceptions: A value that indicates whether this subscription has dead letter support when a message expires. :paramtype dead_lettering_on_filter_evaluation_exceptions: bool or None :keyword max_delivery_count: The maximum delivery count. A message is automatically deadlettered after this number of deliveries. Default value is 10. :paramtype max_delivery_count: int or None :keyword enable_batched_operations: Value that indicates whether server-side batched operations are enabled. :paramtype enable_batched_operations: bool or None :keyword status: Status of a Service Bus resource. Possible values include: "Active", "Creating", "Deleting", "Disabled", "ReceiveDisabled", "Renaming", "Restoring", "SendDisabled", "Unknown". :paramtype status: str or ~azure.servicebus.management.EntityStatus or None :keyword forward_to: The name of the recipient entity to which all the messages sent to the subscription are forwarded to. :paramtype forward_to: str or None :keyword user_metadata: Metadata associated with the subscription. Maximum number of characters is 1024. :paramtype user_metadata: str or None :keyword forward_dead_lettered_messages_to: The name of the recipient entity to which all the messages sent to the subscription are forwarded to. :paramtype forward_dead_lettered_messages_to: str or None :keyword auto_delete_on_idle: ISO 8601 timeSpan idle interval after which the subscription is automatically deleted. The minimum duration is 5 minutes. :paramtype auto_delete_on_idle: ~datetime.timedelta or str or None :keyword availability_status: Availability status of the entity. Possible values include: "Available", "Limited", "Renaming", "Restoring", "Unknown". :paramtype availability_status: str or None or ~azure.servicebus.management.EntityAvailabilityStatus :ivar name: Name of the subscription. :vartype name: str :ivar lock_duration: ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute. :vartype lock_duration: ~datetime.timedelta or str or None :ivar requires_session: A value that indicates whether the queue supports the concept of sessions. :vartype requires_session: bool or None :ivar default_message_time_to_live: ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. :vartype default_message_time_to_live: ~datetime.timedelta or str or None :ivar dead_lettering_on_message_expiration: A value that indicates whether this subscription has dead letter support when a message expires. :vartype dead_lettering_on_message_expiration: bool or None :ivar dead_lettering_on_filter_evaluation_exceptions: A value that indicates whether this subscription has dead letter support when a message expires. :vartype dead_lettering_on_filter_evaluation_exceptions: bool or None :ivar max_delivery_count: The maximum delivery count. A message is automatically deadlettered after this number of deliveries. Default value is 10. :vartype max_delivery_count: int or None :ivar enable_batched_operations: Value that indicates whether server-side batched operations are enabled. :vartype enable_batched_operations: bool or None :ivar status: Status of a Service Bus resource. Possible values include: "Active", "Creating", "Deleting", "Disabled", "ReceiveDisabled", "Renaming", "Restoring", "SendDisabled", "Unknown". :vartype status: str or ~azure.servicebus.management.EntityStatus or None :ivar forward_to: The name of the recipient entity to which all the messages sent to the subscription are forwarded to. :vartype forward_to: str or None :ivar user_metadata: Metadata associated with the subscription. Maximum number of characters is 1024. :vartype user_metadata: str or None :ivar forward_dead_lettered_messages_to: The name of the recipient entity to which all the messages sent to the subscription are forwarded to. :vartype forward_dead_lettered_messages_to: str or None :ivar auto_delete_on_idle: ISO 8601 timeSpan idle interval after which the subscription is automatically deleted. The minimum duration is 5 minutes. :vartype auto_delete_on_idle: ~datetime.timedelta or str or None :ivar availability_status: Availability status of the entity. Possible values include: "Available", "Limited", "Renaming", "Restoring", "Unknown". :vartype availability_status: str or None or ~azure.servicebus.management.EntityAvailabilityStatus """ def __init__( self, name: str, *, lock_duration: Optional[Union[timedelta, str]], requires_session: Optional[bool], default_message_time_to_live: Optional[Union[timedelta, str]], dead_lettering_on_message_expiration: Optional[bool], dead_lettering_on_filter_evaluation_exceptions: Optional[bool], max_delivery_count: Optional[int], enable_batched_operations: Optional[bool], status: Optional[Union[str, "EntityStatus"]], forward_to: Optional[str], user_metadata: Optional[str], forward_dead_lettered_messages_to: Optional[str], auto_delete_on_idle: Optional[Union[timedelta, str]], availability_status: Optional[Union[str, "EntityAvailabilityStatus"]], ) -> None: self.name = name self._internal_sd: InternalSubscriptionDescription self.lock_duration = lock_duration self.requires_session = requires_session self.default_message_time_to_live = default_message_time_to_live self.dead_lettering_on_message_expiration = dead_lettering_on_message_expiration self.dead_lettering_on_filter_evaluation_exceptions = dead_lettering_on_filter_evaluation_exceptions self.max_delivery_count = max_delivery_count self.enable_batched_operations = enable_batched_operations self.status = status self.forward_to = forward_to self.user_metadata = user_metadata self.forward_dead_lettered_messages_to = forward_dead_lettered_messages_to self.auto_delete_on_idle = auto_delete_on_idle self.availability_status = availability_status @classmethod def _from_internal_entity( cls, name: str, internal_subscription: InternalSubscriptionDescription ) -> "SubscriptionProperties": # pylint: disable=line-too-long subscription = cls( name, lock_duration=internal_subscription.lock_duration, requires_session=internal_subscription.requires_session, default_message_time_to_live=internal_subscription.default_message_time_to_live, dead_lettering_on_message_expiration=internal_subscription.dead_lettering_on_message_expiration, dead_lettering_on_filter_evaluation_exceptions=internal_subscription.dead_lettering_on_filter_evaluation_exceptions, max_delivery_count=internal_subscription.max_delivery_count, enable_batched_operations=internal_subscription.enable_batched_operations, status=internal_subscription.status, forward_to=internal_subscription.forward_to, user_metadata=internal_subscription.user_metadata, forward_dead_lettered_messages_to=internal_subscription.forward_dead_lettered_messages_to, auto_delete_on_idle=internal_subscription.auto_delete_on_idle, availability_status=internal_subscription.entity_availability_status, ) subscription._internal_sd = deepcopy(internal_subscription) return subscription def _to_internal_entity( self, fully_qualified_namespace: str, kwargs: Optional[Dict] = None ) -> InternalSubscriptionDescription: kwargs = kwargs or {} if not hasattr(self, "_internal_sd"): self._internal_sd = InternalSubscriptionDescription() self._internal_sd.lock_duration = kwargs.pop( "lock_duration", self.lock_duration ) self._internal_sd.requires_session = kwargs.pop( "requires_session", self.requires_session ) self._internal_sd.default_message_time_to_live = avoid_timedelta_overflow( # type: ignore kwargs.pop( "default_message_time_to_live", self.default_message_time_to_live ) ) self._internal_sd.dead_lettering_on_message_expiration = kwargs.pop( "dead_lettering_on_message_expiration", self.dead_lettering_on_message_expiration, ) self._internal_sd.dead_lettering_on_filter_evaluation_exceptions = kwargs.pop( "dead_lettering_on_filter_evaluation_exceptions", self.dead_lettering_on_filter_evaluation_exceptions, ) self._internal_sd.max_delivery_count = kwargs.pop( "max_delivery_count", self.max_delivery_count ) self._internal_sd.enable_batched_operations = kwargs.pop( "enable_batched_operations", self.enable_batched_operations ) self._internal_sd.status = kwargs.pop("status", self.status) forward_to = kwargs.pop("forward_to", self.forward_to) self._internal_sd.forward_to = _normalize_entity_path_to_full_path_if_needed( forward_to, fully_qualified_namespace ) forward_dead_lettered_messages_to = kwargs.pop( "forward_dead_lettered_messages_to", self.forward_dead_lettered_messages_to ) self._internal_sd.forward_dead_lettered_messages_to = ( _normalize_entity_path_to_full_path_if_needed( forward_dead_lettered_messages_to, fully_qualified_namespace ) ) self._internal_sd.user_metadata = kwargs.pop( "user_metadata", self.user_metadata ) self._internal_sd.auto_delete_on_idle = avoid_timedelta_overflow( # type: ignore kwargs.pop("auto_delete_on_idle", self.auto_delete_on_idle) ) self._internal_sd.entity_availability_status = kwargs.pop( "availability_status", self.availability_status ) return self._internal_sd
[docs]class SubscriptionRuntimeProperties(object): """Runtime properties of a Service Bus topic subscription resource.""" def __init__(self) -> None: self._internal_sd: InternalSubscriptionDescription self._name: str @classmethod def _from_internal_entity( cls, name: str, internal_subscription: InternalSubscriptionDescription ) -> "SubscriptionRuntimeProperties": subscription = cls() subscription._name = name subscription._internal_sd = internal_subscription return subscription @property def name(self) -> str: """Name of subscription :rtype: str """ return self._name @property def accessed_at_utc(self) -> Optional[datetime]: """Last time a message was sent, or the last time there was a receive request :rtype: ~datetime.datetime or None """ return self._internal_sd.accessed_at @property def created_at_utc(self) -> Optional[datetime]: """The exact time the subscription was created. :rtype: ~datetime.datetime or None """ return self._internal_sd.created_at @property def updated_at_utc(self) -> Optional[datetime]: """The exact time the entity is updated. :rtype: ~datetime.datetime or None """ return self._internal_sd.updated_at @property def total_message_count(self) -> Optional[int]: """The number of messages in the subscription. :rtype: int or None """ return self._internal_sd.message_count @property def active_message_count(self) -> Optional[int]: """Number of active messages in the subscription. :rtype: int or None """ return cast("MessageCountDetails", self._internal_sd.message_count_details).active_message_count @property def dead_letter_message_count(self) -> Optional[int]: """Number of messages that are dead lettered. :rtype: int or None """ return cast("MessageCountDetails", self._internal_sd.message_count_details).dead_letter_message_count @property def transfer_dead_letter_message_count(self) -> Optional[int]: """Number of messages transferred into dead letters. :rtype: int or None """ return ( cast("MessageCountDetails", self._internal_sd.message_count_details).transfer_dead_letter_message_count ) @property def transfer_message_count(self) -> Optional[int]: """Number of messages transferred to another queue, topic, or subscription. :rtype: int or None """ return cast("MessageCountDetails", self._internal_sd.message_count_details).transfer_message_count
[docs]class RuleProperties(DictMixin): """Properties of a topic subscription rule. **Please use `get_rule`, `create_rule`, or `list_rules` on the ServiceBusAdministrationClient to get a `RuleProperties` instance instead of instantiating a `RuleProperties` object directly.** :param name: Name of the rule. :type name: str :keyword filter: The filter of the rule. :paramtype filter: ~azure.servicebus.management.CorrelationRuleFilter or ~azure.servicebus.management.SqlRuleFilter or None :keyword action: The action of the rule. :paramtype action: ~azure.servicebus.management.SqlRuleAction or None :keyword created_at_utc: The exact time the rule was created. :paramtype created_at_utc: ~datetime.datetime or None :ivar name: Name of the rule. :vartype name: str :ivar filter: The filter of the rule. :vartype filter: ~azure.servicebus.management.CorrelationRuleFilter or ~azure.servicebus.management.SqlRuleFilter or None :ivar action: The action of the rule. :vartype action: ~azure.servicebus.management.SqlRuleAction or None :ivar created_at_utc: The exact time the rule was created. :vartype created_at_utc: ~datetime.datetime or None """ def __init__( self, name: str, *, filter: Optional[Union["CorrelationRuleFilter", "SqlRuleFilter"]], action: Optional["SqlRuleAction"], created_at_utc: Optional[datetime], ) -> None: self.name = name self._internal_rule: InternalRuleDescription self.filter = filter self.action = action self.created_at_utc = created_at_utc @classmethod def _from_internal_entity( cls, name: str, internal_rule: InternalRuleDescription ) -> "RuleProperties": rule = cls( name, filter=RULE_CLASS_MAPPING[type(internal_rule.filter)]._from_internal_entity( internal_rule.filter ) if internal_rule.filter and isinstance(internal_rule.filter, tuple(RULE_CLASS_MAPPING.keys())) else None, action=RULE_CLASS_MAPPING[type(internal_rule.action)]._from_internal_entity( internal_rule.action ) if internal_rule.action and isinstance(internal_rule.action, tuple(RULE_CLASS_MAPPING.keys())) else None, created_at_utc=internal_rule.created_at, ) rule._internal_rule = deepcopy(internal_rule) return rule def _to_internal_entity( self, kwargs: Optional[Dict] = None ) -> InternalRuleDescription: kwargs = kwargs or {} if not hasattr(self, "_internal_rule"): self._internal_rule = InternalRuleDescription() rule_filter = kwargs.pop("filter", self.filter) self._internal_rule.filter = rule_filter._to_internal_entity() if rule_filter else TRUE_FILTER # type: ignore action = kwargs.pop("action", self.action) self._internal_rule.action = ( action._to_internal_entity() if action else EMPTY_RULE_ACTION ) self._internal_rule.created_at = kwargs.pop( "created_at_utc", self.created_at_utc ) self._internal_rule.name = kwargs.pop("name", self.name) return self._internal_rule
[docs]class CorrelationRuleFilter(object): """Represents the correlation filter expression. :keyword correlation_id: Identifier of the correlation. :paramtype correlation_id: str or None :keyword message_id: Identifier of the message. :paramtype message_id: str or None :keyword to: Address to send to. :paramtype to: str or None :keyword reply_to: Address of the queue to reply to. :paramtype reply_to: str or None :keyword label: Application specific label. :paramtype label: str or None :keyword session_id: Session identifier. :paramtype session_id: str or None :keyword reply_to_session_id: Session identifier to reply to. :paramtype reply_to_session_id: str or None :keyword content_type: Content type of the message. :paramtype content_type: str or None :keyword properties: dictionary object for custom filters :paramtype properties: dict[str, Union[str, int, float, bool, datetime, timedelta]] or None """ def __init__( self, *, correlation_id: Optional[str] = None, message_id: Optional[str] = None, to: Optional[str] = None, reply_to: Optional[str] = None, label: Optional[str] = None, session_id: Optional[str] = None, reply_to_session_id: Optional[str] = None, content_type: Optional[str] = None, properties: Optional[Dict[str, Union[str, int, float, bool, datetime, timedelta]]] = None, ) -> None: self.correlation_id = correlation_id self.message_id = message_id self.to = to self.reply_to = reply_to self.label = label self.session_id = session_id self.reply_to_session_id = reply_to_session_id self.content_type = content_type self.properties = properties @classmethod def _from_internal_entity( cls, internal_correlation_filter: InternalCorrelationFilter ) -> "CorrelationRuleFilter": correlation_filter = cls() correlation_filter.correlation_id = internal_correlation_filter.correlation_id correlation_filter.message_id = internal_correlation_filter.message_id correlation_filter.to = internal_correlation_filter.to correlation_filter.reply_to = internal_correlation_filter.reply_to correlation_filter.label = internal_correlation_filter.label correlation_filter.session_id = internal_correlation_filter.session_id correlation_filter.reply_to_session_id = ( internal_correlation_filter.reply_to_session_id ) correlation_filter.content_type = internal_correlation_filter.content_type correlation_filter.properties = ( OrderedDict( (kv.key, kv.value) for kv in internal_correlation_filter.properties ) if internal_correlation_filter.properties else OrderedDict() ) return correlation_filter def _to_internal_entity( self ) -> InternalCorrelationFilter: internal_entity = InternalCorrelationFilter() internal_entity.correlation_id = self.correlation_id internal_entity.message_id = self.message_id internal_entity.to = self.to internal_entity.reply_to = self.reply_to internal_entity.label = self.label internal_entity.session_id = self.session_id internal_entity.reply_to_session_id = self.reply_to_session_id internal_entity.content_type = self.content_type internal_entity.properties = ( [KeyObjectValue(key=key, value=value) for key, value in self.properties.items()] if self.properties else None ) return internal_entity
[docs]class SqlRuleFilter(object): """Represents a filter which is a composition of an expression and an action that is executed in the pub/sub pipeline. .. admonition:: Example: .. code-block:: python :caption: Create SqlRuleFilter. sql_filter = SqlRuleFilter("property1 = 'value'") sql_filter_parametrized = SqlRuleFilter( "property1 = @param1 AND property2 = @param2", parameters={ "@param1": "value", "@param2" : 1 } ) :param sql_expression: The SQL expression. e.g. MyProperty='ABC' :type sql_expression: str :param parameters: Sets the value of the sql expression parameters if any. :type parameters: Dict[str, Union[str, int, float, bool, datetime, timedelta]] """ def __init__( self, sql_expression: Optional[str] = None, parameters: Optional[Dict[str, Union[str, int, float, bool, datetime, timedelta]]] = None ) -> None: self.sql_expression = sql_expression self.parameters = parameters self.requires_preprocessing = True @classmethod def _from_internal_entity(cls, internal_sql_rule_filter): sql_rule_filter = cls() sql_rule_filter.sql_expression = internal_sql_rule_filter.sql_expression sql_rule_filter.parameters = ( OrderedDict( (kv.key, kv.value) for kv in internal_sql_rule_filter.parameters ) if internal_sql_rule_filter.parameters else OrderedDict() ) sql_rule_filter.requires_preprocessing = ( internal_sql_rule_filter.requires_preprocessing ) return sql_rule_filter def _to_internal_entity(self) -> InternalSqlFilter: internal_entity = InternalSqlFilter(sql_expression=self.sql_expression) internal_entity.parameters = ( [ KeyValue(key=key, value=value) for key, value in self.parameters.items() # type: ignore ] if self.parameters else None ) internal_entity.compatibility_level = RULE_SQL_COMPATIBILITY_LEVEL internal_entity.requires_preprocessing = self.requires_preprocessing return internal_entity
[docs]class TrueRuleFilter(SqlRuleFilter): """A sql filter with a sql expression that is always True""" def __init__(self) -> None: super(TrueRuleFilter, self).__init__("1=1", None) def _to_internal_entity(self): internal_entity = InternalTrueFilter() internal_entity.sql_expression = self.sql_expression internal_entity.requires_preprocessing = True internal_entity.compatibility_level = RULE_SQL_COMPATIBILITY_LEVEL return internal_entity
[docs]class FalseRuleFilter(SqlRuleFilter): """A sql filter with a sql expression that is always True""" def __init__(self) -> None: super(FalseRuleFilter, self).__init__("1>1", None) def _to_internal_entity(self): internal_entity = InternalFalseFilter() internal_entity.sql_expression = self.sql_expression internal_entity.requires_preprocessing = True internal_entity.compatibility_level = RULE_SQL_COMPATIBILITY_LEVEL return internal_entity
[docs]class SqlRuleAction(object): """Represents set of actions written in SQL language-based syntax that is performed against a ServiceBus.Messaging.BrokeredMessage . :param sql_expression: SQL expression. e.g. MyProperty='ABC' :type sql_expression: str :param parameters: Sets the value of the sql expression parameters if any. :type parameters: Dict[str, Union[str, int, float, bool, datetime, timedelta]] """ def __init__( self, sql_expression: Optional[str] = None, parameters: Optional[Dict[str, Union[str, int, float, bool, datetime, timedelta]]] = None, ) -> None: self.sql_expression = sql_expression self.parameters = parameters self.requires_preprocessing = True @classmethod def _from_internal_entity(cls, internal_sql_rule_action): sql_rule_action = cls() sql_rule_action.sql_expression = internal_sql_rule_action.sql_expression sql_rule_action.parameters = ( OrderedDict( (kv.key, kv.value) for kv in internal_sql_rule_action.parameters ) if internal_sql_rule_action.parameters else OrderedDict() ) sql_rule_action.requires_preprocessing = ( internal_sql_rule_action.requires_preprocessing ) return sql_rule_action def _to_internal_entity(self): internal_entity = InternalSqlRuleAction(sql_expression=self.sql_expression) internal_entity.parameters = ( [KeyValue(key=key, value=value) for key, value in self.parameters.items()] if self.parameters else None ) internal_entity.compatibility_level = RULE_SQL_COMPATIBILITY_LEVEL internal_entity.requires_preprocessing = self.requires_preprocessing return internal_entity
RULE_CLASS_MAPPING: Dict[Type[Model], Type] = { InternalSqlRuleAction: SqlRuleAction, # InternalEmptyRuleAction: None, InternalCorrelationFilter: CorrelationRuleFilter, InternalSqlFilter: SqlRuleFilter, InternalTrueFilter: TrueRuleFilter, InternalFalseFilter: FalseRuleFilter, } EMPTY_RULE_ACTION = InternalEmptyRuleAction() TRUE_FILTER = TrueRuleFilter()
[docs]class AuthorizationRule(object): """Authorization rule of an entity. :keyword type: The authorization type. :paramtype type: str :keyword claim_type: The claim type. :paramtype claim_type: str :keyword claim_value: The claim value. :paramtype claim_value: str :keyword rights: Access rights of the entity. Values are 'Send', 'Listen', or 'Manage'. :paramtype rights: list[AccessRights] :keyword created_at_utc: The date and time when the authorization rule was created. :paramtype created_at_utc: ~datetime.datetime :keyword modified_at_utc: The date and time when the authorization rule was modified. :paramtype modified_at_utc: ~datetime.datetime :keyword key_name: The authorization rule key name. :paramtype key_name: str :keyword primary_key: The primary key of the authorization rule. :paramtype primary_key: str :keyword secondary_key: The primary key of the authorization rule. :paramtype secondary_key: str """ def __init__( self, *, type: Optional[str] = None, claim_type: Optional[str] = None, claim_value: Optional[str] = None, rights: Optional[List[Union[str, "AccessRights"]]] = None, created_at_utc: Optional[datetime] = None, modified_at_utc: Optional[datetime] = None, key_name: Optional[str] = None, primary_key: Optional[str] = None, secondary_key: Optional[str] = None, ) -> None: self.type = type self.claim_type = claim_type self.claim_value = claim_value self.rights = rights self.created_at_utc = created_at_utc self.modified_at_utc = modified_at_utc self.key_name = key_name self.primary_key = primary_key self.secondary_key = secondary_key @classmethod def _from_internal_entity(cls, internal_authorization_rule: InternalAuthorizationRule) -> "AuthorizationRule": authorization_rule = cls() authorization_rule.claim_type = internal_authorization_rule.claim_type authorization_rule.claim_value = internal_authorization_rule.claim_value authorization_rule.rights = internal_authorization_rule.rights authorization_rule.created_at_utc = internal_authorization_rule.created_time authorization_rule.modified_at_utc = internal_authorization_rule.modified_time authorization_rule.key_name = internal_authorization_rule.key_name authorization_rule.primary_key = internal_authorization_rule.primary_key authorization_rule.secondary_key = internal_authorization_rule.secondary_key return authorization_rule def _to_internal_entity(self) -> InternalAuthorizationRule: internal_entity = InternalAuthorizationRule() internal_entity.claim_type = self.claim_type internal_entity.claim_value = self.claim_value internal_entity.rights = self.rights internal_entity.created_time = self.created_at_utc internal_entity.modified_time = self.modified_at_utc internal_entity.key_name = self.key_name internal_entity.primary_key = self.primary_key internal_entity.secondary_key = self.secondary_key return internal_entity