Source code for azure.servicebus.common.mgmt_handlers

# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------

import uamqp

from azure.servicebus.common.message import PeekMessage, DeferredMessage
from azure.servicebus.common.errors import ServiceBusError, MessageLockExpired


[docs]def default(status_code, message, description): if status_code == 200: return message.get_data() raise ServiceBusError( "Management request returned status code: {}. Description: {}, Data: {}".format( status_code, description, message.get_data()))
[docs]def lock_renew_op(status_code, message, description): if status_code == 200: return message.get_data() if status_code == 410: raise MessageLockExpired(message=description) raise ServiceBusError( "Management request returned status code: {}. Description: {}, Data: {}".format( status_code, description, message.get_data()))
[docs]def peek_op(status_code, message, description): if status_code == 200: parsed = [] for m in message.get_data()[b'messages']: wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message'])) parsed.append(PeekMessage(wrapped)) return parsed if status_code in [202, 204]: return [] error = "Message peek failed with status code: {}.\n".format(status_code) if description: error += "{}.".format(description) raise ServiceBusError(error)
[docs]def list_sessions_op(status_code, message, description): if status_code == 200: parsed = [] for m in message.get_data()[b'sessions-ids']: parsed.append(m.decode('UTF-8')) return parsed if status_code in [202, 204]: return [] error = "List sessions failed with status code: {}.\n".format(status_code) if description: error += "{}.".format(description) raise ServiceBusError(error)
[docs]def deferred_message_op(status_code, message, description, mode=1, message_type=DeferredMessage): if status_code == 200: parsed = [] for m in message.get_data()[b'messages']: wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message'])) parsed.append(message_type(wrapped, mode)) return parsed if status_code in [202, 204]: return [] error = "Retrieving deferred messages failed with status code: {}.\n".format(status_code) if description: error += "{}.".format(description) raise ServiceBusError(error)
[docs]def schedule_op(status_code, message, description): if status_code == 200: return message.get_data()[b'sequence-numbers'] error = "Scheduling messages failed with status code: {}.\n".format(status_code) if description: error += "{}.".format(description) raise ServiceBusError(error)