# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
# pylint: disable=too-few-public-methods,too-many-instance-attributes
import sys
import json
from datetime import datetime
from azure.common import AzureException
from ._common_models import WindowsAzureData, _unicode_type
from ._common_error import (
_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE,
_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK,
_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_RENEW_LOCK)
[docs]class AzureServiceBusPeekLockError(AzureException):
'''Indicates that peek-lock is required for this operation.'''
[docs]class AzureServiceBusResourceNotFound(AzureException):
'''Indicates that the resource doesn't exist.'''
[docs]class Queue(WindowsAzureData):
''' Queue class corresponding to Queue Description:
http://msdn.microsoft.com/en-us/library/windowsazure/hh780773'''
def __init__(self, lock_duration=None, max_size_in_megabytes=None,
requires_duplicate_detection=None, requires_session=None,
default_message_time_to_live=None,
dead_lettering_on_message_expiration=None,
duplicate_detection_history_time_window=None,
max_delivery_count=None, enable_batched_operations=None,
size_in_bytes=None, message_count=None):
self.lock_duration = lock_duration
self.max_size_in_megabytes = max_size_in_megabytes
self.requires_duplicate_detection = requires_duplicate_detection
self.requires_session = requires_session
self.default_message_time_to_live = default_message_time_to_live
self.dead_lettering_on_message_expiration = \
dead_lettering_on_message_expiration
self.duplicate_detection_history_time_window = \
duplicate_detection_history_time_window
self.max_delivery_count = max_delivery_count
self.enable_batched_operations = enable_batched_operations
self.size_in_bytes = size_in_bytes
self.message_count = message_count
[docs]class Topic(WindowsAzureData):
''' Topic class corresponding to Topic Description:
https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.topicdescription. '''
def __init__(self, default_message_time_to_live=None,
max_size_in_megabytes=None, requires_duplicate_detection=None,
duplicate_detection_history_time_window=None,
enable_batched_operations=None, size_in_bytes=None):
self.default_message_time_to_live = default_message_time_to_live
self.max_size_in_megabytes = max_size_in_megabytes
self.requires_duplicate_detection = requires_duplicate_detection
self.duplicate_detection_history_time_window = \
duplicate_detection_history_time_window
self.enable_batched_operations = enable_batched_operations
self.size_in_bytes = size_in_bytes
@property
def max_size_in_mega_bytes(self):
import warnings
warnings.warn(
'This attribute has been changed to max_size_in_megabytes.')
return self.max_size_in_megabytes
@max_size_in_mega_bytes.setter
def max_size_in_mega_bytes(self, value):
self.max_size_in_megabytes = value
[docs]class Subscription(WindowsAzureData):
''' Subscription class corresponding to Subscription Description:
http://msdn.microsoft.com/en-us/library/windowsazure/hh780763. '''
def __init__(self, lock_duration=None, requires_session=None,
default_message_time_to_live=None,
dead_lettering_on_message_expiration=None,
dead_lettering_on_filter_evaluation_exceptions=None,
enable_batched_operations=None, max_delivery_count=None,
message_count=None):
self.lock_duration = lock_duration
self.requires_session = requires_session
self.default_message_time_to_live = default_message_time_to_live
self.dead_lettering_on_message_expiration = \
dead_lettering_on_message_expiration
self.dead_lettering_on_filter_evaluation_exceptions = \
dead_lettering_on_filter_evaluation_exceptions
self.enable_batched_operations = enable_batched_operations
self.max_delivery_count = max_delivery_count
self.message_count = message_count
[docs]class Rule(WindowsAzureData):
''' Rule class corresponding to Rule Description:
http://msdn.microsoft.com/en-us/library/windowsazure/hh780753. '''
def __init__(self, filter_type=None, filter_expression=None,
action_type=None, action_expression=None):
self.filter_type = filter_type
self.filter_expression = filter_expression
self.action_type = action_type
self.action_expression = action_expression
[docs]class EventHub(WindowsAzureData):
def __init__(self, message_retention_in_days=None, status=None,
user_metadata=None, partition_count=None):
self.message_retention_in_days = message_retention_in_days
self.status = status
self.user_metadata = user_metadata
self.partition_count = partition_count
self.authorization_rules = []
self.partition_ids = []
[docs]class AuthorizationRule(WindowsAzureData):
def __init__(self, claim_type=None, claim_value=None, rights=None,
key_name=None, primary_key=None, secondary_key=None):
self.claim_type = claim_type
self.claim_value = claim_value
self.rights = rights or []
self.created_time = None
self.modified_time = None
self.key_name = key_name
self.primary_key = primary_key
self.secondary_key = secondary_key
[docs]class Message(WindowsAzureData):
''' Message class that used in send message/get message apis. '''
def __init__(self, body=None, service_bus_service=None, location=None,
custom_properties=None,
type='application/atom+xml;type=entry;charset=utf-8', # pylint: disable=redefined-builtin
broker_properties=None):
self.body = body
self.location = location
self.broker_properties = broker_properties
self.custom_properties = custom_properties
self.type = type
self.service_bus_service = service_bus_service
self._topic_name = None
self._subscription_name = None
self._queue_name = None
if not service_bus_service:
return
# if location is set, then extracts the queue name for queue message and
# extracts the topic and subscriptions name if it is topic message.
if location:
if '/subscriptions/' in location:
pos = location.find(service_bus_service.host_base.lower())+1
pos1 = location.find('/subscriptions/')
self._topic_name = location[pos+len(service_bus_service.host_base):pos1]
pos = pos1 + len('/subscriptions/')
pos1 = location.find('/', pos)
self._subscription_name = location[pos:pos1]
elif '/messages/' in location:
pos = location.find(service_bus_service.host_base.lower())+1
pos1 = location.find('/messages/')
self._queue_name = location[pos+len(service_bus_service.host_base):pos1]
[docs] def delete(self):
''' Deletes itself if find queue name or topic name and subscription
name. '''
if self._queue_name:
self.service_bus_service.delete_queue_message(
self._queue_name,
self.broker_properties['SequenceNumber'],
self.broker_properties['LockToken'])
elif self._topic_name and self._subscription_name:
self.service_bus_service.delete_subscription_message(
self._topic_name,
self._subscription_name,
self.broker_properties['SequenceNumber'],
self.broker_properties['LockToken'])
else:
raise AzureServiceBusPeekLockError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE)
[docs] def unlock(self):
''' Unlocks itself if find queue name or topic name and subscription
name. '''
if self._queue_name:
self.service_bus_service.unlock_queue_message(
self._queue_name,
self.broker_properties['SequenceNumber'],
self.broker_properties['LockToken'])
elif self._topic_name and self._subscription_name:
self.service_bus_service.unlock_subscription_message(
self._topic_name,
self._subscription_name,
self.broker_properties['SequenceNumber'],
self.broker_properties['LockToken'])
else:
raise AzureServiceBusPeekLockError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK)
[docs] def renew_lock(self):
''' Renew lock on itself if find queue name or topic name and subscription
name. '''
if self._queue_name:
self.service_bus_service.renew_lock_queue_message(
self._queue_name,
self.broker_properties['SequenceNumber'],
self.broker_properties['LockToken'])
elif self._topic_name and self._subscription_name:
self.service_bus_service.renew_lock_subscription_message(
self._topic_name,
self._subscription_name,
self.broker_properties['SequenceNumber'],
self.broker_properties['LockToken'])
else:
raise AzureServiceBusPeekLockError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_RENEW_LOCK)
def _serialize_escaped_properties_value(self, value): # pylint: disable=no-self-use
if sys.version_info < (3,) and isinstance(value, _unicode_type):
escaped_value = value.replace('"', '\\"')
return '"' + escaped_value.encode('utf-8') + '"'
if isinstance(value, str):
escaped_value = value.replace('"', '\\"')
return '"' + escaped_value + '"'
if isinstance(value, datetime):
return '"' + value.strftime('%a, %d %b %Y %H:%M:%S GMT') + '"'
return str(value).lower()
def _serialize_basic_properties_value(self, value): # pylint: disable=no-self-use
if sys.version_info < (3,) and isinstance(value, _unicode_type):
return value.encode('utf-8')
if isinstance(value, str):
return value
if isinstance(value, datetime):
return value.strftime('%a, %d %b %Y %H:%M:%S GMT')
return str(value).lower()
[docs] def as_batch_body(self):
''' return the current message as expected by batch body format'''
if sys.version_info >= (3,) and isinstance(self.body, bytes):
# It HAS to be string to be serialized in JSON
body = self.body.decode('utf-8')
else:
# Python 2.7 people handle this themself
body = self.body
result = {'Body': body}
# Adds custom properties
if self.custom_properties:
result['UserProperties'] = {name: self._serialize_basic_properties_value(value)
for name, value
in self.custom_properties.items()}
# Adds BrokerProperties
if self.broker_properties:
result['BrokerProperties'] = {name: self._serialize_basic_properties_value(value)
for name, value
in self.broker_properties.items()}
return result