# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# pylint:disable=protected-access
# pylint:disable=too-many-lines
import functools
from collections import OrderedDict
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Type, Dict, Any, Union, Optional, List
from msrest.serialization import Model
from ._generated.models import QueueDescription as InternalQueueDescription, \
TopicDescription as InternalTopicDescription, \
SubscriptionDescription as InternalSubscriptionDescription, \
RuleDescription as InternalRuleDescription, \
SqlRuleAction as InternalSqlRuleAction, \
EmptyRuleAction as InternalEmptyRuleAction, \
CorrelationFilter as InternalCorrelationFilter, \
NamespaceProperties as InternalNamespaceProperties, \
SqlFilter as InternalSqlFilter, TrueFilter as InternalTrueFilter, FalseFilter as InternalFalseFilter, \
KeyValue, AuthorizationRule as InternalAuthorizationRule
from ._model_workaround import adjust_attribute_map
from ._constants import RULE_SQL_COMPATIBILITY_LEVEL
adjust_attribute_map()
# These helpers are to ensure that the Properties objects can't be constructed without all args present,
# as a compromise between our use of kwargs to flatten arg-lists and trying to de-incentivise manual instantiation
# while still trying to provide some guardrails.
def extract_kwarg_template(kwargs, extraction_missing_args, name):
try:
return kwargs[name]
except KeyError:
extraction_missing_args.append(name)
def validate_extraction_missing_args(extraction_missing_args):
if extraction_missing_args:
raise TypeError('__init__() missing {} required keyword arguments: {}'.format(
len(extraction_missing_args),
' and '.join(["'" + e + "'" for e in extraction_missing_args])))
class DictMixin(object):
def __setitem__(self, key, item):
# type: (Any, Any) -> None
self.__dict__[key] = item
def __getitem__(self, key):
# type: (Any) -> Any
return self.__dict__[key]
def __repr__(self):
# type: () -> str
return str(self)
def __len__(self):
# type: () -> int
return len(self.keys())
def __delitem__(self, key):
# type: (Any) -> None
self.__dict__[key] = None
def __eq__(self, other):
# type: (Any) -> bool
"""Compare objects by comparing all attributes."""
if isinstance(other, self.__class__):
return self.__dict__ == other.__dict__
return False
def __ne__(self, other):
# type: (Any) -> bool
"""Compare objects by comparing all attributes."""
return not self.__eq__(other)
def __str__(self):
# type: () -> str
return str({k: v for k, v in self.__dict__.items() if not k.startswith('_')})
def has_key(self, k):
# type: (Any) -> bool
return k in self.__dict__
def update(self, *args, **kwargs):
# type: (Any, Any) -> None
return self.__dict__.update(*args, **kwargs)
def keys(self):
# type: () -> list
return [k for k in self.__dict__ if not k.startswith('_')]
def values(self):
# type: () -> list
return [v for k, v in self.__dict__.items() if not k.startswith('_')]
def items(self):
# type: () -> list
return [(k, v) for k, v in self.__dict__.items() if not k.startswith('_')]
def get(self, key, default=None):
# type: (Any, Optional[Any]) -> Any
if key in self.__dict__:
return self.__dict__[key]
return default
[docs]class NamespaceProperties(DictMixin):
"""The metadata related to a Service Bus namespace.
:ivar alias: Alias for the geo-disaster recovery Service Bus namespace.
:type alias: str
:ivar created_at_utc: The exact time the namespace was created.
:type created_at_utc: ~datetime.datetime
:ivar messaging_sku: The SKU for the messaging entity. Possible values include: "Basic",
"Standard", "Premium".
:type messaging_sku: str or ~azure.servicebus.management._generated.models.MessagingSku
:ivar messaging_units: The number of messaging units allocated to the namespace.
:type messaging_units: int
:ivar modified_at_utc: The exact time the namespace was last modified.
:type modified_at_utc: ~datetime.datetime
:ivar name: Name of the namespace.
:type name: str
"""
def __init__(
self,
name,
**kwargs
):
# type: (str, Any) -> None
self.name = name
extraction_missing_args = [] # type: List[str]
extract_kwarg = functools.partial(extract_kwarg_template, kwargs, extraction_missing_args)
self.name = name
self.alias = extract_kwarg('alias')
self.created_at_utc = extract_kwarg('created_at_utc')
self.messaging_sku = extract_kwarg('messaging_sku')
self.messaging_units = extract_kwarg('messaging_units')
self.modified_at_utc = extract_kwarg('modified_at_utc')
self.namespace_type = extract_kwarg('namespace_type')
validate_extraction_missing_args(extraction_missing_args)
@classmethod
def _from_internal_entity(cls, name, internal_entity):
# type: (str, InternalNamespaceProperties) -> NamespaceProperties
namespace_properties = cls(
name,
alias=internal_entity.alias,
created_at_utc=internal_entity.created_time,
messaging_sku=internal_entity.messaging_sku,
messaging_units=internal_entity.messaging_units,
modified_at_utc=internal_entity.modified_time,
namespace_type=internal_entity.namespace_type,
)
return namespace_properties
def _to_internal_entity(self):
internal_entity = InternalNamespaceProperties()
internal_entity.alias = self.alias
internal_entity.created_time = self.created_at_utc
internal_entity.messaging_sku = self.messaging_sku
internal_entity.messaging_units = self.messaging_units
internal_entity.modified_time = self.modified_at_utc
internal_entity.namespace_type = self.namespace_type
return internal_entity
[docs]class QueueProperties(DictMixin): # pylint:disable=too-many-instance-attributes
"""Properties of a Service Bus queue resource.
:ivar name: Name of the queue.
:type name: str
:ivar authorization_rules: Authorization rules for resource.
:type authorization_rules: list[~azure.servicebus.management.AuthorizationRule]
:ivar auto_delete_on_idle: ISO 8601 timeSpan idle interval after which the queue is
automatically deleted. The minimum duration is 5 minutes.
:type auto_delete_on_idle: ~datetime.timedelta
:ivar dead_lettering_on_message_expiration: A value that indicates whether this queue has dead
letter support when a message expires.
:type dead_lettering_on_message_expiration: bool
:ivar default_message_time_to_live: ISO 8601 default message timespan to live value. This is
the duration after which the message expires, starting from when the message is sent to Service
Bus. This is the default value used when TimeToLive is not set on a message itself.
:type default_message_time_to_live: ~datetime.timedelta
:ivar duplicate_detection_history_time_window: ISO 8601 timeSpan structure that defines the
duration of the duplicate detection history. The default value is 10 minutes.
:type duplicate_detection_history_time_window: ~datetime.timedelta
:ivar availability_status: Availibility status of the entity. Possible values include:
"Available", "Limited", "Renaming", "Restoring", "Unknown".
:type availability_status: str or
~azure.servicebus.management.EntityAvailabilityStatus
:ivar enable_batched_operations: Value that indicates whether server-side batched operations
are enabled.
:type enable_batched_operations: bool
:ivar enable_express: A value that indicates whether Express Entities are enabled. An express
queue holds a message in memory temporarily before writing it to persistent storage.
:type enable_express: bool
:ivar enable_partitioning: A value that indicates whether the queue is to be partitioned
across multiple message brokers.
:type enable_partitioning: bool
:ivar lock_duration: ISO 8601 timespan duration of a peek-lock; that is, the amount of time
that the message is locked for other receivers. The maximum value for LockDuration is 5
minutes; the default value is 1 minute.
:type lock_duration: ~datetime.timedelta
:ivar max_delivery_count: The maximum delivery count. A message is automatically deadlettered
after this number of deliveries. Default value is 10.
:type max_delivery_count: int
:ivar max_size_in_megabytes: The maximum size of the queue in megabytes, which is the size of
memory allocated for the queue.
:type max_size_in_megabytes: int
:ivar requires_duplicate_detection: A value indicating if this queue requires duplicate
detection.
:type requires_duplicate_detection: bool
:ivar requires_session: A value that indicates whether the queue supports the concept of
sessions.
:type requires_session: bool
:ivar status: Status of a Service Bus resource. Possible values include: "Active", "Creating",
"Deleting", "Disabled", "ReceiveDisabled", "Renaming", "Restoring", "SendDisabled", "Unknown".
:type status: str or ~azure.servicebus.management.EntityStatus
:ivar forward_to: The name of the recipient entity to which all the messages sent to the queue
are forwarded to.
:type forward_to: str
:ivar user_metadata: Custom metdata that user can associate with the description. Max length
is 1024 chars.
:type user_metadata: str
:ivar forward_dead_lettered_messages_to: The name of the recipient entity to which all the
dead-lettered messages of this subscription are forwarded to.
:type forward_dead_lettered_messages_to: str
"""
def __init__(
self,
name,
**kwargs
):
# type: (str, Any) -> None
self.name = name
self._internal_qd = None # type: Optional[InternalQueueDescription]
extraction_missing_args = [] # type: List[str]
extract_kwarg = functools.partial(extract_kwarg_template, kwargs, extraction_missing_args)
self.authorization_rules = extract_kwarg('authorization_rules')
self.auto_delete_on_idle = extract_kwarg('auto_delete_on_idle')
self.dead_lettering_on_message_expiration = extract_kwarg('dead_lettering_on_message_expiration')
self.default_message_time_to_live = extract_kwarg('default_message_time_to_live')
self.duplicate_detection_history_time_window = extract_kwarg('duplicate_detection_history_time_window')
self.availability_status = extract_kwarg('availability_status')
self.enable_batched_operations = extract_kwarg('enable_batched_operations')
self.enable_express = extract_kwarg('enable_express')
self.enable_partitioning = extract_kwarg('enable_partitioning')
self.lock_duration = extract_kwarg('lock_duration')
self.max_delivery_count = extract_kwarg('max_delivery_count')
self.max_size_in_megabytes = extract_kwarg('max_size_in_megabytes')
self.requires_duplicate_detection = extract_kwarg('requires_duplicate_detection')
self.requires_session = extract_kwarg('requires_session')
self.status = extract_kwarg('status')
self.forward_to = extract_kwarg('forward_to')
self.user_metadata = extract_kwarg('user_metadata')
self.forward_dead_lettered_messages_to = extract_kwarg('forward_dead_lettered_messages_to')
validate_extraction_missing_args(extraction_missing_args)
@classmethod
def _from_internal_entity(cls, name, internal_qd):
# type: (str, InternalQueueDescription) -> QueueProperties
qd = cls(
name,
authorization_rules=[AuthorizationRule._from_internal_entity(r) for r in internal_qd.authorization_rules] \
if internal_qd.authorization_rules else (internal_qd.authorization_rules or []),
auto_delete_on_idle=internal_qd.auto_delete_on_idle,
dead_lettering_on_message_expiration=internal_qd.dead_lettering_on_message_expiration,
default_message_time_to_live=internal_qd.default_message_time_to_live,
duplicate_detection_history_time_window=internal_qd.duplicate_detection_history_time_window,
availability_status=internal_qd.entity_availability_status,
enable_batched_operations=internal_qd.enable_batched_operations,
enable_express=internal_qd.enable_express,
enable_partitioning=internal_qd.enable_partitioning,
lock_duration=internal_qd.lock_duration,
max_delivery_count=internal_qd.max_delivery_count,
max_size_in_megabytes=internal_qd.max_size_in_megabytes,
requires_duplicate_detection=internal_qd.requires_duplicate_detection,
requires_session=internal_qd.requires_session,
status=internal_qd.status,
forward_to=internal_qd.forward_to,
forward_dead_lettered_messages_to=internal_qd.forward_dead_lettered_messages_to,
user_metadata=internal_qd.user_metadata
)
qd._internal_qd = deepcopy(internal_qd) # pylint:disable=protected-access
return qd
def _to_internal_entity(self):
if not self._internal_qd:
internal_qd = InternalQueueDescription()
self._internal_qd = internal_qd
self._internal_qd.authorization_rules = [r._to_internal_entity() for r in self.authorization_rules] \
if self.authorization_rules else self.authorization_rules
self._internal_qd.auto_delete_on_idle = self.auto_delete_on_idle
self._internal_qd.dead_lettering_on_message_expiration = self.dead_lettering_on_message_expiration
self._internal_qd.default_message_time_to_live = self.default_message_time_to_live
self._internal_qd.duplicate_detection_history_time_window = self.duplicate_detection_history_time_window
self._internal_qd.entity_availability_status = self.availability_status
self._internal_qd.enable_batched_operations = self.enable_batched_operations
self._internal_qd.enable_express = self.enable_express
self._internal_qd.enable_partitioning = self.enable_partitioning
self._internal_qd.lock_duration = self.lock_duration
self._internal_qd.max_delivery_count = self.max_delivery_count
self._internal_qd.max_size_in_megabytes = self.max_size_in_megabytes
self._internal_qd.requires_duplicate_detection = self.requires_duplicate_detection
self._internal_qd.requires_session = self.requires_session
self._internal_qd.status = self.status
self._internal_qd.forward_to = self.forward_to
self._internal_qd.forward_dead_lettered_messages_to = self.forward_dead_lettered_messages_to
self._internal_qd.user_metadata = self.user_metadata
return self._internal_qd
[docs]class QueueRuntimeProperties(object):
"""Service Bus queue runtime properties.
"""
def __init__(
self,
):
# type: () -> None
self._name = None # type: Optional[str]
self._internal_qr = None # type: Optional[InternalQueueDescription]
@classmethod
def _from_internal_entity(cls, name, internal_qr):
# type: (str, InternalQueueDescription) -> QueueRuntimeProperties
qr = cls()
qr._name = name
qr._internal_qr = deepcopy(internal_qr) # pylint:disable=protected-access
return qr
@property
def name(self):
"""Name of the queue.
:rtype: str
"""
return self._name
@property
def accessed_at_utc(self):
"""Last time a message was sent, or the last time there was a receive request to this queue.
:rtype: ~datetime.datetime
"""
return self._internal_qr.accessed_at
@property
def created_at_utc(self):
"""The exact time the queue was created.
:rtype: ~datetime.datetime
"""
return self._internal_qr.created_at
@property
def updated_at_utc(self):
"""The exact the entity was updated.
:rtype: ~datetime.datetime
"""
return self._internal_qr.updated_at
@property
def size_in_bytes(self):
"""The size of the queue, in bytes.
:rtype: int
"""
return self._internal_qr.size_in_bytes
@property
def total_message_count(self):
"""Total number of messages.
:rtype: int
"""
return self._internal_qr.message_count
@property
def active_message_count(self):
"""Number of active messages in the queue, topic, or subscription.
:rtype: int
"""
return self._internal_qr.message_count_details.active_message_count
@property
def dead_letter_message_count(self):
"""Number of messages that are dead lettered.
:rtype: int
"""
return self._internal_qr.message_count_details.dead_letter_message_count
@property
def scheduled_message_count(self):
"""Number of scheduled messages.
:rtype: int
"""
return self._internal_qr.message_count_details.scheduled_message_count
@property
def transfer_dead_letter_message_count(self):
"""Number of messages transferred into dead letters.
:rtype: int
"""
return self._internal_qr.message_count_details.transfer_dead_letter_message_count
@property
def transfer_message_count(self):
"""Number of messages transferred to another queue, topic, or subscription.
:rtype: int
"""
return self._internal_qr.message_count_details.transfer_message_count
[docs]class TopicProperties(DictMixin): # pylint:disable=too-many-instance-attributes
"""Properties of a Service Bus topic resource.
:ivar name: Name of the topic.
:type name: str
:ivar default_message_time_to_live: ISO 8601 default message timespan to live value. This is
the duration after which the message expires, starting from when the message is sent to Service
Bus. This is the default value used when TimeToLive is not set on a message itself.
:type default_message_time_to_live: ~datetime.timedelta
:ivar max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
memory allocated for the topic.
:type max_size_in_megabytes: long
:ivar requires_duplicate_detection: A value indicating if this topic requires duplicate
detection.
:type requires_duplicate_detection: bool
:ivar duplicate_detection_history_time_window: ISO 8601 timeSpan structure that defines the
duration of the duplicate detection history. The default value is 10 minutes.
:type duplicate_detection_history_time_window: ~datetime.timedelta
:ivar enable_batched_operations: Value that indicates whether server-side batched operations
are enabled.
:type enable_batched_operations: bool
:ivar size_in_bytes: The size of the topic, in bytes.
:type size_in_bytes: int
:ivar filtering_messages_before_publishing: Filter messages before publishing.
:type filtering_messages_before_publishing: bool
:ivar authorization_rules: Authorization rules for resource.
:type authorization_rules:
list[~azure.servicebus.management.AuthorizationRule]
:ivar status: Status of a Service Bus resource. Possible values include: "Active", "Creating",
"Deleting", "Disabled", "ReceiveDisabled", "Renaming", "Restoring", "SendDisabled", "Unknown".
:type status: str or ~azure.servicebus.management.EntityStatus
:ivar support_ordering: A value that indicates whether the topic supports ordering.
:type support_ordering: bool
:ivar auto_delete_on_idle: ISO 8601 timeSpan idle interval after which the topic is
automatically deleted. The minimum duration is 5 minutes.
:type auto_delete_on_idle: ~datetime.timedelta
:ivar enable_partitioning: A value that indicates whether the topic is to be partitioned
across multiple message brokers.
:type enable_partitioning: bool
:ivar availability_status: Availability status of the entity. Possible values include:
"Available", "Limited", "Renaming", "Restoring", "Unknown".
:type availability_status: str or
~azure.servicebus.management.EntityAvailabilityStatus
:ivar enable_express: A value that indicates whether Express Entities are enabled. An express
queue holds a message in memory temporarily before writing it to persistent storage.
:type enable_express: bool
:ivar user_metadata: Metadata associated with the topic.
:type user_metadata: str
"""
def __init__(
self,
name,
**kwargs
):
# type: (str, Any) -> None
self.name = name
self._internal_td = None # type: Optional[InternalTopicDescription]
extraction_missing_args = [] # type: List[str]
extract_kwarg = functools.partial(extract_kwarg_template, kwargs, extraction_missing_args)
self.default_message_time_to_live = extract_kwarg('default_message_time_to_live')
self.max_size_in_megabytes = extract_kwarg('max_size_in_megabytes')
self.requires_duplicate_detection = extract_kwarg('requires_duplicate_detection')
self.duplicate_detection_history_time_window = extract_kwarg('duplicate_detection_history_time_window')
self.enable_batched_operations = extract_kwarg('enable_batched_operations')
self.size_in_bytes = extract_kwarg('size_in_bytes')
self.authorization_rules = extract_kwarg('authorization_rules')
self.status = extract_kwarg('status')
self.support_ordering = extract_kwarg('support_ordering')
self.auto_delete_on_idle = extract_kwarg('auto_delete_on_idle')
self.enable_partitioning = extract_kwarg('enable_partitioning')
self.availability_status = extract_kwarg('availability_status')
self.enable_express = extract_kwarg('enable_express')
self.user_metadata = extract_kwarg('user_metadata')
validate_extraction_missing_args(extraction_missing_args)
@classmethod
def _from_internal_entity(cls, name, internal_td):
# type: (str, InternalTopicDescription) -> TopicProperties
td = cls(
name,
default_message_time_to_live=internal_td.default_message_time_to_live,
max_size_in_megabytes=internal_td.max_size_in_megabytes,
requires_duplicate_detection=internal_td.requires_duplicate_detection,
duplicate_detection_history_time_window=internal_td.duplicate_detection_history_time_window,
enable_batched_operations=internal_td.enable_batched_operations,
size_in_bytes=internal_td.size_in_bytes,
authorization_rules=[AuthorizationRule._from_internal_entity(r) for r in internal_td.authorization_rules] \
if internal_td.authorization_rules else internal_td.authorization_rules,
status=internal_td.status,
support_ordering=internal_td.support_ordering,
auto_delete_on_idle=internal_td.auto_delete_on_idle,
enable_partitioning=internal_td.enable_partitioning,
availability_status=internal_td.entity_availability_status,
enable_express=internal_td.enable_express,
user_metadata=internal_td.user_metadata
)
td._internal_td = deepcopy(internal_td)
return td
def _to_internal_entity(self):
# type: () -> InternalTopicDescription
if not self._internal_td:
self._internal_td = InternalTopicDescription()
self._internal_td.default_message_time_to_live = self.default_message_time_to_live
self._internal_td.max_size_in_megabytes = self.max_size_in_megabytes
self._internal_td.requires_duplicate_detection = self.requires_duplicate_detection
self._internal_td.duplicate_detection_history_time_window = self.duplicate_detection_history_time_window
self._internal_td.enable_batched_operations = self.enable_batched_operations
self._internal_td.size_in_bytes = self.size_in_bytes
self._internal_td.authorization_rules = [r._to_internal_entity() for r in self.authorization_rules] \
if self.authorization_rules else self.authorization_rules
self._internal_td.status = self.status
self._internal_td.support_ordering = self.support_ordering
self._internal_td.auto_delete_on_idle = self.auto_delete_on_idle
self._internal_td.enable_partitioning = self.enable_partitioning
self._internal_td.entity_availability_status = self.availability_status
self._internal_td.enable_express = self.enable_express
self._internal_td.user_metadata = self.user_metadata
return self._internal_td
[docs]class TopicRuntimeProperties(object):
"""Runtime properties of a Service Bus topic resource.
"""
def __init__(
self,
):
# type: () -> None
self._name = None # type: Optional[str]
self._internal_td = None # type: Optional[InternalTopicDescription]
@classmethod
def _from_internal_entity(cls, name, internal_td):
# type: (str, InternalTopicDescription) -> TopicRuntimeProperties
qd = cls()
qd._name = name
qd._internal_td = internal_td
return qd
@property
def name(self):
"""The name of the topic.
:rtype: str
"""
return self._name
@property
def accessed_at_utc(self):
"""Last time a message was sent, or the last time there was a receive request
:rtype: ~datetime.datetime
"""
return self._internal_td.accessed_at
@property
def created_at_utc(self):
"""The exact time the queue was created.
:rtype: ~datetime.datetime
"""
return self._internal_td.created_at
@property
def updated_at_utc(self):
"""The exact time the entity was updated.
:rtype: ~datetime.datetime
"""
return self._internal_td.updated_at
@property
def size_in_bytes(self):
"""The current size of the entity in bytes.
:rtype: int
"""
return self._internal_td.size_in_bytes
@property
def subscription_count(self):
"""The number of subscriptions in the topic.
:rtype: int
"""
return self._internal_td.subscription_count
@property
def scheduled_message_count(self):
"""Number of scheduled messages.
:rtype: int
"""
return self._internal_td.message_count_details.scheduled_message_count
[docs]class SubscriptionProperties(DictMixin): # pylint:disable=too-many-instance-attributes
"""Properties of a Service Bus topic subscription resource.
:ivar name: Name of the subscription.
:type name: str
:ivar lock_duration: ISO 8601 timespan duration of a peek-lock; that is, the amount of time
that the message is locked for other receivers. The maximum value for LockDuration is 5
minutes; the default value is 1 minute.
:type lock_duration: ~datetime.timedelta
:ivar requires_session: A value that indicates whether the queue supports the concept of
sessions.
:type requires_session: bool
:ivar default_message_time_to_live: ISO 8601 default message timespan to live value. This is
the duration after which the message expires, starting from when the message is sent to Service
Bus. This is the default value used when TimeToLive is not set on a message itself.
:type default_message_time_to_live: ~datetime.timedelta
:ivar dead_lettering_on_message_expiration: A value that indicates whether this subscription
has dead letter support when a message expires.
:type dead_lettering_on_message_expiration: bool
:ivar dead_lettering_on_filter_evaluation_exceptions: A value that indicates whether this
subscription has dead letter support when a message expires.
:type dead_lettering_on_filter_evaluation_exceptions: bool
:ivar max_delivery_count: The maximum delivery count. A message is automatically deadlettered
after this number of deliveries. Default value is 10.
:type max_delivery_count: int
:ivar enable_batched_operations: Value that indicates whether server-side batched operations
are enabled.
:type enable_batched_operations: bool
:ivar status: Status of a Service Bus resource. Possible values include: "Active", "Creating",
"Deleting", "Disabled", "ReceiveDisabled", "Renaming", "Restoring", "SendDisabled", "Unknown".
:type status: str or ~azure.servicebus.management.EntityStatus
:ivar forward_to: The name of the recipient entity to which all the messages sent to the
subscription are forwarded to.
:type forward_to: str
:ivar user_metadata: Metadata associated with the subscription. Maximum number of characters
is 1024.
:type user_metadata: str
:ivar forward_dead_lettered_messages_to: The name of the recipient entity to which all the
messages sent to the subscription are forwarded to.
:type forward_dead_lettered_messages_to: str
:ivar auto_delete_on_idle: ISO 8601 timeSpan idle interval after which the subscription is
automatically deleted. The minimum duration is 5 minutes.
:type auto_delete_on_idle: ~datetime.timedelta
:ivar availability_status: Availability status of the entity. Possible values include:
"Available", "Limited", "Renaming", "Restoring", "Unknown".
:type availability_status: str or
~azure.servicebus.management.EntityAvailabilityStatus
"""
def __init__(self, name, **kwargs):
# type: (str, Any) -> None
self.name = name
self._internal_sd = None # type: Optional[InternalSubscriptionDescription]
extraction_missing_args = [] # type: List[str]
extract_kwarg = functools.partial(extract_kwarg_template, kwargs, extraction_missing_args)
self.lock_duration = extract_kwarg('lock_duration')
self.requires_session = extract_kwarg('requires_session')
self.default_message_time_to_live = extract_kwarg('default_message_time_to_live')
self.dead_lettering_on_message_expiration = extract_kwarg('dead_lettering_on_message_expiration')
self.dead_lettering_on_filter_evaluation_exceptions = extract_kwarg(
'dead_lettering_on_filter_evaluation_exceptions')
self.max_delivery_count = extract_kwarg('max_delivery_count')
self.enable_batched_operations = extract_kwarg('enable_batched_operations')
self.status = extract_kwarg('status')
self.forward_to = extract_kwarg('forward_to')
self.user_metadata = extract_kwarg('user_metadata')
self.forward_dead_lettered_messages_to = extract_kwarg('forward_dead_lettered_messages_to')
self.auto_delete_on_idle = extract_kwarg('auto_delete_on_idle')
self.availability_status = extract_kwarg('availability_status')
validate_extraction_missing_args(extraction_missing_args)
@classmethod
def _from_internal_entity(cls, name, internal_subscription):
# type: (str, InternalSubscriptionDescription) -> SubscriptionProperties
subscription = cls(
name,
lock_duration=internal_subscription.lock_duration,
requires_session=internal_subscription.requires_session,
default_message_time_to_live=internal_subscription.default_message_time_to_live,
dead_lettering_on_message_expiration=internal_subscription.dead_lettering_on_message_expiration,
dead_lettering_on_filter_evaluation_exceptions=
internal_subscription.dead_lettering_on_filter_evaluation_exceptions,
max_delivery_count=internal_subscription.max_delivery_count,
enable_batched_operations=internal_subscription.enable_batched_operations,
status=internal_subscription.status,
forward_to=internal_subscription.forward_to,
user_metadata=internal_subscription.user_metadata,
forward_dead_lettered_messages_to=internal_subscription.forward_dead_lettered_messages_to,
auto_delete_on_idle=internal_subscription.auto_delete_on_idle,
availability_status=internal_subscription.entity_availability_status
)
subscription._internal_sd = deepcopy(internal_subscription)
return subscription
def _to_internal_entity(self):
# type: () -> InternalSubscriptionDescription
if not self._internal_sd:
self._internal_sd = InternalSubscriptionDescription()
self._internal_sd.lock_duration = self.lock_duration
self._internal_sd.requires_session = self.requires_session
self._internal_sd.default_message_time_to_live = self.default_message_time_to_live
self._internal_sd.dead_lettering_on_message_expiration = self.dead_lettering_on_message_expiration
self._internal_sd.dead_lettering_on_filter_evaluation_exceptions = \
self.dead_lettering_on_filter_evaluation_exceptions
self._internal_sd.max_delivery_count = self.max_delivery_count
self._internal_sd.enable_batched_operations = self.enable_batched_operations
self._internal_sd.status = self.status
self._internal_sd.forward_to = self.forward_to
self._internal_sd.user_metadata = self.user_metadata
self._internal_sd.forward_dead_lettered_messages_to = self.forward_dead_lettered_messages_to
self._internal_sd.auto_delete_on_idle = self.auto_delete_on_idle
self._internal_sd.entity_availability_status = self.availability_status
return self._internal_sd
[docs]class SubscriptionRuntimeProperties(object):
"""Runtime properties of a Service Bus topic subscription resource.
"""
def __init__(self):
# type: () -> None
self._internal_sd = None # type: Optional[InternalSubscriptionDescription]
self._name = None # type: Optional[str]
@classmethod
def _from_internal_entity(cls, name, internal_subscription):
# type: (str, InternalSubscriptionDescription) -> SubscriptionRuntimeProperties
subscription = cls()
subscription._name = name
subscription._internal_sd = internal_subscription
return subscription
@property
def name(self):
"""Name of subscription
:rtype: str
"""
return self._name
@property
def accessed_at_utc(self):
"""Last time a message was sent, or the last time there was a receive request
:rtype: ~datetime.datetime
"""
return self._internal_sd.accessed_at
@property
def created_at_utc(self):
"""The exact time the subscription was created.
:rtype: ~datetime.datetime
"""
return self._internal_sd.created_at
@property
def updated_at_utc(self):
"""The exact time the entity is updated.
:rtype: ~datetime.datetime
"""
return self._internal_sd.updated_at
@property
def total_message_count(self):
"""The number of messages in the subscription.
:rtype: int
"""
return self._internal_sd.message_count
@property
def active_message_count(self):
"""Number of active messages in the subscription.
:rtype: int
"""
return self._internal_sd.message_count_details.active_message_count
@property
def dead_letter_message_count(self):
"""Number of messages that are dead lettered.
:rtype: int
"""
return self._internal_sd.message_count_details.dead_letter_message_count
@property
def transfer_dead_letter_message_count(self):
"""Number of messages transferred into dead letters.
:rtype: int
"""
return self._internal_sd.message_count_details.transfer_dead_letter_message_count
@property
def transfer_message_count(self):
"""Number of messages transferred to another queue, topic, or subscription.
:rtype: int
"""
return self._internal_sd.message_count_details.transfer_message_count
[docs]class RuleProperties(DictMixin):
"""Properties of a topic subscription rule.
:param name: Name of the rule.
:type name: str
:ivar filter: The filter of the rule.
:type filter: Union[~azure.servicebus.management.CorrelationRuleFilter,
~azure.servicebus.management.SqlRuleFilter]
:ivar action: The action of the rule.
:type action: Optional[~azure.servicebus.management.SqlRuleAction]
:ivar created_at_utc: The exact time the rule was created.
:type created_at_utc: ~datetime.datetime
"""
def __init__(self, name, **kwargs):
# type: (str, Any) -> None
self.name = name
self._internal_rule = None # type: Optional[InternalRuleDescription]
extraction_missing_args = [] # type: List[str]
extract_kwarg = functools.partial(extract_kwarg_template, kwargs, extraction_missing_args)
self.filter = extract_kwarg('filter')
self.action = extract_kwarg('action')
self.created_at_utc = extract_kwarg('created_at_utc')
validate_extraction_missing_args(extraction_missing_args)
@classmethod
def _from_internal_entity(cls, name, internal_rule):
# type: (str, InternalRuleDescription) -> RuleProperties
rule = cls(
name,
filter=RULE_CLASS_MAPPING[type(internal_rule.filter)]._from_internal_entity(internal_rule.filter)
if internal_rule.filter and isinstance(internal_rule.filter, tuple(RULE_CLASS_MAPPING.keys())) else None,
action=RULE_CLASS_MAPPING[type(internal_rule.action)]._from_internal_entity(internal_rule.action)
if internal_rule.action and isinstance(internal_rule.action, tuple(RULE_CLASS_MAPPING.keys())) else None,
created_at_utc=internal_rule.created_at
)
rule._internal_rule = deepcopy(internal_rule)
return rule
def _to_internal_entity(self):
# type: () -> InternalRuleDescription
if not self._internal_rule:
self._internal_rule = InternalRuleDescription()
self._internal_rule.filter = self.filter._to_internal_entity() if self.filter else TRUE_FILTER # type: ignore
self._internal_rule.action = self.action._to_internal_entity() if self.action else EMPTY_RULE_ACTION
self._internal_rule.created_at = self.created_at_utc
self._internal_rule.name = self.name
return self._internal_rule
[docs]class CorrelationRuleFilter(object):
"""Represents the correlation filter expression.
:param correlation_id: Identifier of the correlation.
:type correlation_id: str
:param message_id: Identifier of the message.
:type message_id: str
:param to: Address to send to.
:type to: str
:param reply_to: Address of the queue to reply to.
:type reply_to: str
:param label: Application specific label.
:type label: str
:param session_id: Session identifier.
:type session_id: str
:param reply_to_session_id: Session identifier to reply to.
:type reply_to_session_id: str
:param content_type: Content type of the message.
:type content_type: str
:param properties: dictionary object for custom filters
:type properties: dict[str, Union[str, int, float, bool, datetime, timedelta]]
"""
def __init__(self, **kwargs):
# type: (Any) -> None
self.correlation_id = kwargs.get('correlation_id', None)
self.message_id = kwargs.get('message_id', None)
self.to = kwargs.get('to', None)
self.reply_to = kwargs.get('reply_to', None)
self.label = kwargs.get('label', None)
self.session_id = kwargs.get('session_id', None)
self.reply_to_session_id = kwargs.get('reply_to_session_id', None)
self.content_type = kwargs.get('content_type', None)
self.properties = kwargs.get('properties', None)
@classmethod
def _from_internal_entity(cls, internal_correlation_filter):
# type: (InternalCorrelationFilter) -> CorrelationRuleFilter
correlation_filter = cls()
correlation_filter.correlation_id = internal_correlation_filter.correlation_id
correlation_filter.message_id = internal_correlation_filter.message_id
correlation_filter.to = internal_correlation_filter.to
correlation_filter.reply_to = internal_correlation_filter.reply_to
correlation_filter.label = internal_correlation_filter.label
correlation_filter.session_id = internal_correlation_filter.session_id
correlation_filter.reply_to_session_id = internal_correlation_filter.reply_to_session_id
correlation_filter.content_type = internal_correlation_filter.content_type
correlation_filter.properties = \
OrderedDict((kv.key, kv.value) for kv in internal_correlation_filter.properties) \
if internal_correlation_filter.properties else OrderedDict()
return correlation_filter
def _to_internal_entity(self):
# type: () -> InternalCorrelationFilter
internal_entity = InternalCorrelationFilter()
internal_entity.correlation_id = self.correlation_id
internal_entity.message_id = self.message_id
internal_entity.to = self.to
internal_entity.reply_to = self.reply_to
internal_entity.label = self.label
internal_entity.session_id = self.session_id
internal_entity.reply_to_session_id = self.reply_to_session_id
internal_entity.content_type = self.content_type
internal_entity.properties = [KeyValue(key=key, value=value) for key, value in self.properties.items()] \
if self.properties else None
return internal_entity
[docs]class SqlRuleFilter(object):
"""Represents a filter which is a composition of an expression and an action
that is executed in the pub/sub pipeline.
:param sql_expression: The SQL expression. e.g. MyProperty='ABC'
:type sql_expression: str
:param parameters: Sets the value of the sql expression parameters if any.
:type parameters: dict[str, Union[str, int, float, bool, datetime, timedelta]]
"""
def __init__(self, sql_expression=None, parameters=None):
# type: (Optional[str], Optional[Dict[str, Union[str, int, float, bool, datetime, timedelta]]]) -> None
self.sql_expression = sql_expression
self.parameters = parameters
self.requires_preprocessing = True
@classmethod
def _from_internal_entity(cls, internal_sql_rule_filter):
sql_rule_filter = cls()
sql_rule_filter.sql_expression = internal_sql_rule_filter.sql_expression
sql_rule_filter.parameters = OrderedDict((kv.key, kv.value) for kv in internal_sql_rule_filter.parameters) \
if internal_sql_rule_filter.parameters else OrderedDict()
sql_rule_filter.requires_preprocessing = internal_sql_rule_filter.requires_preprocessing
return sql_rule_filter
def _to_internal_entity(self):
# type: () -> InternalSqlFilter
internal_entity = InternalSqlFilter(sql_expression=self.sql_expression)
internal_entity.parameters = [
KeyValue(key=key, value=value) for key, value in self.parameters.items() # type: ignore
] if self.parameters else None
internal_entity.compatibility_level = RULE_SQL_COMPATIBILITY_LEVEL
internal_entity.requires_preprocessing = self.requires_preprocessing
return internal_entity
[docs]class TrueRuleFilter(SqlRuleFilter):
"""A sql filter with a sql expression that is always True
"""
def __init__(self):
# type: () -> None
super(TrueRuleFilter, self).__init__("1=1", None)
def _to_internal_entity(self):
internal_entity = InternalTrueFilter()
internal_entity.sql_expression = self.sql_expression
internal_entity.requires_preprocessing = True
internal_entity.compatibility_level = RULE_SQL_COMPATIBILITY_LEVEL
return internal_entity
[docs]class FalseRuleFilter(SqlRuleFilter):
"""A sql filter with a sql expression that is always True
"""
def __init__(self):
# type: () -> None
super(FalseRuleFilter, self).__init__("1>1", None)
def _to_internal_entity(self):
internal_entity = InternalFalseFilter()
internal_entity.sql_expression = self.sql_expression
internal_entity.requires_preprocessing = True
internal_entity.compatibility_level = RULE_SQL_COMPATIBILITY_LEVEL
return internal_entity
[docs]class SqlRuleAction(object):
"""Represents set of actions written in SQL language-based syntax that is
performed against a ServiceBus.Messaging.BrokeredMessage .
:param sql_expression: SQL expression. e.g. MyProperty='ABC'
:type sql_expression: str
:param parameters: Sets the value of the sql expression parameters if any.
:type parameters: dict[str, Union[str, int, float, bool, datetime, timedelta]]
:type requires_preprocessing: bool
"""
def __init__(self, sql_expression=None, parameters=None):
# type: (Optional[str], Optional[Dict[str, Union[str, int, float, bool, datetime, timedelta]]]) -> None
self.sql_expression = sql_expression
self.parameters = parameters
self.requires_preprocessing = True
@classmethod
def _from_internal_entity(cls, internal_sql_rule_action):
sql_rule_action = cls()
sql_rule_action.sql_expression = internal_sql_rule_action.sql_expression
sql_rule_action.parameters = OrderedDict((kv.key, kv.value) for kv in internal_sql_rule_action.parameters) \
if internal_sql_rule_action.parameters else OrderedDict()
sql_rule_action.requires_preprocessing = internal_sql_rule_action.requires_preprocessing
return sql_rule_action
def _to_internal_entity(self):
internal_entity = InternalSqlRuleAction(sql_expression=self.sql_expression)
internal_entity.parameters = [KeyValue(key=key, value=value) for key, value in self.parameters.items()] \
if self.parameters else None
internal_entity.compatibility_level = RULE_SQL_COMPATIBILITY_LEVEL
internal_entity.requires_preprocessing = self.requires_preprocessing
return internal_entity
RULE_CLASS_MAPPING = {
InternalSqlRuleAction: SqlRuleAction,
# InternalEmptyRuleAction: None,
InternalCorrelationFilter: CorrelationRuleFilter,
InternalSqlFilter: SqlRuleFilter,
InternalTrueFilter: TrueRuleFilter,
InternalFalseFilter: FalseRuleFilter,
} # type: Dict[Type[Model], Type]
EMPTY_RULE_ACTION = InternalEmptyRuleAction()
TRUE_FILTER = TrueRuleFilter()
[docs]class AuthorizationRule(object):
"""Authorization rule of an entity.
:param type: The authorization type.
:type type: str
:param claim_type: The claim type.
:type claim_type: str
:param claim_value: The claim value.
:type claim_value: str
:param rights: Access rights of the entity. Values are 'Send', 'Listen', or 'Manage'.
:type rights: list[AccessRights]
:param created_at_utc: The date and time when the authorization rule was created.
:type created_at_utc: ~datetime.datetime
:param modified_at_utc: The date and time when the authorization rule was modified.
:type modified_at_utc: ~datetime.datetime
:param key_name: The authorization rule key name.
:type key_name: str
:param primary_key: The primary key of the authorization rule.
:type primary_key: str
:param secondary_key: The primary key of the authorization rule.
:type secondary_key: str
"""
def __init__(
self,
**kwargs
):
# type: (Any) -> None
self.type = kwargs.get('type', None)
self.claim_type = kwargs.get('claim_type', None)
self.claim_value = kwargs.get('claim_value', None)
self.rights = kwargs.get('rights', None)
self.created_at_utc = kwargs.get('created_at_utc', None)
self.modified_at_utc = kwargs.get('modified_at_utc', None)
self.key_name = kwargs.get('key_name', None)
self.primary_key = kwargs.get('primary_key', None)
self.secondary_key = kwargs.get('secondary_key', None)
@classmethod
def _from_internal_entity(cls, internal_authorization_rule):
authorization_rule = cls()
authorization_rule.claim_type = internal_authorization_rule.claim_type
authorization_rule.claim_value = internal_authorization_rule.claim_value
authorization_rule.rights = internal_authorization_rule.rights
authorization_rule.created_at_utc = internal_authorization_rule.created_time
authorization_rule.modified_at_utc = internal_authorization_rule.modified_time
authorization_rule.key_name = internal_authorization_rule.key_name
authorization_rule.primary_key = internal_authorization_rule.primary_key
authorization_rule.secondary_key = internal_authorization_rule.secondary_key
return authorization_rule
def _to_internal_entity(self):
# type: () -> InternalAuthorizationRule
internal_entity = InternalAuthorizationRule()
internal_entity.claim_type = self.claim_type
internal_entity.claim_value = self.claim_value
internal_entity.rights = self.rights
internal_entity.created_time = self.created_at_utc
internal_entity.modified_time = self.modified_at_utc
internal_entity.key_name = self.key_name
internal_entity.primary_key = self.primary_key
internal_entity.secondary_key = self.secondary_key
return internal_entity