# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
import asyncio
import datetime
import functools
import uuid
import collections
import six
from uamqp import ReceiveClientAsync
from uamqp import authentication
from uamqp import constants, types, errors
from azure.servicebus.aio import Message, DeferredMessage
from azure.servicebus.aio.async_base_handler import BaseHandler
from azure.servicebus.common import mgmt_handlers, mixins
from azure.servicebus.common.errors import (
InvalidHandlerState,
NoActiveSession,
SessionLockExpired)
from azure.servicebus.common.constants import (
SESSION_LOCK_LOST,
SESSION_LOCK_TIMEOUT,
REQUEST_RESPONSE_RENEWLOCK_OPERATION,
REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION,
REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION,
REQUEST_RESPONSE_RENEW_SESSION_LOCK_OPERATION,
REQUEST_RESPONSE_PEEK_OPERATION,
REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION,
REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION,
ReceiveSettleMode)
[docs]class Receiver(collections.abc.AsyncIterator, BaseHandler): # pylint: disable=too-many-instance-attributes
"""A message receiver.
This receive handler acts as an iterable message stream for retrieving
messages for a Service Bus entity. It operates a single connection that must be opened and
closed on completion. The service connection will remain open for the entirety of the iterator.
If you find yourself only partially iterating the message stream, you should run the receiver
in a `with` statement to ensure the connection is closed.
The Receiver should not be instantiated directly, and should be accessed from a `QueueClient` or
`SubscriptionClient` using the `get_receiver()` method.
.. note:: This object is not thread-safe.
:param handler_id: The ID used as the connection name for the Receiver.
:type handler_id: str
:param source: The endpoint from which to receive messages.
:type source: ~uamqp.Source
:param auth_config: The SASL auth credentials.
:type auth_config: dict[str, str]
:param loop: An async event loop
:type loop: ~asyncio.EventLoop
:param connection: A shared connection [not yet supported].
:type connection: ~uamqp.Connection
:param mode: The receive connection mode. Value must be either PeekLock or ReceiveAndDelete.
:type mode: ~azure.servicebus.common.constants.ReceiveSettleMode
:param encoding: The encoding used for string properties. Default is 'UTF-8'.
:type encoding: str
:param debug: Whether to enable network trace debug logs.
:type debug: bool
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START open_close_receiver_context]
:end-before: [END open_close_receiver_context]
:language: python
:dedent: 4
:caption: Running a queue receiver within a context manager.
"""
def __init__(
self, handler_id, source, auth_config, *, loop=None, connection=None,
mode=ReceiveSettleMode.PeekLock, encoding='UTF-8', debug=False, **kwargs):
self._used = asyncio.Event()
self.name = "SBReceiver-{}".format(handler_id)
self.last_received = None
self.mode = mode
self.message_iter = None
super(Receiver, self).__init__(
source, auth_config, loop=loop, connection=connection, encoding=encoding, debug=debug, **kwargs)
async def __anext__(self):
await self._can_run()
while True:
if self.receiver_shutdown:
await self.close()
raise StopAsyncIteration
try:
received = await self.message_iter.__anext__()
wrapped = self._build_message(received)
return wrapped
except StopAsyncIteration:
await self.close()
raise
except Exception as e: # pylint: disable=broad-except
await self._handle_exception(e)
def _build_handler(self):
auth = None if self.connection else authentication.SASTokenAsync.from_shared_access_key(**self.auth_config)
self._handler = ReceiveClientAsync(
self.endpoint,
auth=auth,
debug=self.debug,
properties=self.properties,
error_policy=self.error_policy,
client_name=self.name,
auto_complete=False,
encoding=self.encoding,
loop=self.loop,
**self.handler_kwargs)
async def _build_receiver(self):
"""This is a temporary patch pending a fix in uAMQP."""
# pylint: disable=protected-access
self._handler.message_handler = self._handler.receiver_type(
self._handler._session,
self._handler._remote_address,
self._handler._name,
on_message_received=self._handler._message_received,
name='receiver-link-{}'.format(uuid.uuid4()),
debug=self._handler._debug_trace,
prefetch=self._handler._prefetch,
max_message_size=self._handler._max_message_size,
properties=self._handler._link_properties,
error_policy=self._handler._error_policy,
encoding=self._handler._encoding,
loop=self._handler.loop)
if self.mode != ReceiveSettleMode.PeekLock:
self._handler.message_handler.send_settle_mode = constants.SenderSettleMode.Settled
self._handler.message_handler.receive_settle_mode = constants.ReceiverSettleMode.ReceiveAndDelete
self._handler.message_handler._settle_mode = constants.ReceiverSettleMode.ReceiveAndDelete
await self._handler.message_handler.open_async()
def _build_message(self, received):
message = Message(None, message=received)
message._receiver = self # pylint: disable=protected-access
self.last_received = message.sequence_number
return message
async def _can_run(self):
if self._used.is_set():
raise InvalidHandlerState("Receiver has already closed.")
if self.receiver_shutdown:
await self.close()
raise InvalidHandlerState("Receiver has already closed.")
if not self.running:
await self.open()
async def _renew_locks(self, *lock_tokens):
message = {'lock-tokens': types.AMQPArray(lock_tokens)}
return await self._mgmt_request_response(
REQUEST_RESPONSE_RENEWLOCK_OPERATION,
message,
mgmt_handlers.lock_renew_op)
async def _settle_deferred(self, settlement, lock_tokens, dead_letter_details=None):
message = {
'disposition-status': settlement,
'lock-tokens': types.AMQPArray(lock_tokens)}
if dead_letter_details:
message.update(dead_letter_details)
return await self._mgmt_request_response(
REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION,
message,
mgmt_handlers.default)
@property
def receiver_shutdown(self):
"""Whether the receiver connection has been marked for shutdown.
If this value is `True` - it does not indicate that the connection
has yet been closed.
This property is used internally and should not be relied upon to asses
the status of the connection.
:rtype: bool
"""
if self._handler:
return self._handler._shutdown # pylint: disable=protected-access
return True
@receiver_shutdown.setter
def receiver_shutdown(self, value):
"""Mark the connection as ready for shutdown.
This property is used internally and should not be set in normal usage.
:param bool value: Whether to shutdown the connection.
"""
if self._handler:
self._handler._shutdown = value # pylint: disable=protected-access
else:
raise ValueError("Receiver has no AMQP handler")
@property
def queue_size(self):
"""The current size of the unprocessed message queue.
:rtype: int
"""
# pylint: disable=protected-access
if self._handler._received_messages:
return self._handler._received_messages.qsize()
return 0
[docs] async def open(self):
"""Open receiver connection and authenticate session.
If the receiver is already open, this operation will do nothing.
This method will be called automatically when one starts to iterate
messages in the receiver, so there should be no need to call it directly.
A receiver opened with this method must be explicitly closed.
It is recommended to open a handler within a context manager as
opposed to calling the method directly.
.. note:: This operation is not thread-safe.
"""
if self.running:
return
self.running = True
try:
await self._handler.open_async(connection=self.connection)
self.message_iter = self._handler.receive_messages_iter_async()
while not await self._handler.auth_complete_async():
await asyncio.sleep(0.05)
await self._build_receiver()
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
except Exception as e: # pylint: disable=broad-except
try:
await self._handle_exception(e)
except:
self.running = False
raise
[docs] async def close(self, exception=None):
"""Close down the receiver connection.
If the receiver has already closed, this operation will do nothing. An optional
exception can be passed in to indicate that the handler was shutdown due to error.
It is recommended to open a handler within a context manager as
opposed to calling the method directly.
The receiver will be implicitly closed on completion of the message iterator,
however this method will need to be called explicitly if the message iterator is not run
to completion.
.. note:: This operation is not thread-safe.
:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START open_close_receiver_directly]
:end-before: [END open_close_receiver_directly]
:language: python
:dedent: 4
:caption: Iterate then explicitly close a Receiver.
"""
if not self.running:
return
self.running = False
self.receiver_shutdown = True
self._used.set()
await super(Receiver, self).close(exception=exception)
[docs] async def peek(self, count=1, start_from=0):
"""Browse messages currently pending in the queue.
Peeked messages are not removed from queue, nor are they locked. They cannot be completed,
deferred or dead-lettered.
:param count: The maximum number of messages to try and peek. The default
value is 1.
:type count: int
:param start_from: A message sequence number from which to start browsing messages.
:type start_from: int
:rtype: list[~azure.servicebus.common.message.PeekMessage]
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START receiver_peek_messages]
:end-before: [END receiver_peek_messages]
:language: python
:dedent: 4
:caption: Peek messages in the queue.
"""
await self._can_run()
if not start_from:
start_from = self.last_received or 1
if int(count) < 1:
raise ValueError("count must be 1 or greater.")
if int(start_from) < 1:
raise ValueError("start_from must be 1 or greater.")
message = {
'from-sequence-number': types.AMQPLong(start_from),
'message-count': count
}
return await self._mgmt_request_response(
REQUEST_RESPONSE_PEEK_OPERATION,
message,
mgmt_handlers.peek_op)
[docs] async def receive_deferred_messages(self, sequence_numbers, mode=ReceiveSettleMode.PeekLock):
"""Receive messages that have previously been deferred.
When receiving deferred messages from a partitioned entity, all of the supplied
sequence numbers must be messages from the same partition.
:param sequence_numbers: A list of the sequence numbers of messages that have been
deferred.
:type sequence_numbers: list[int]
:param mode: The receive mode, default value is PeekLock.
:type mode: ~azure.servicebus.common.constants.ReceiveSettleMode
:rtype: list[~azure.servicebus.aio.async_message.DeferredMessage]
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START receiver_defer_messages]
:end-before: [END receiver_defer_messages]
:language: python
:dedent: 8
:caption: Defer messages, then retrieve them by sequence number.
"""
if not sequence_numbers:
raise ValueError("At least one sequence number must be specified.")
await self._can_run()
try:
receive_mode = mode.value.value
except AttributeError:
receive_mode = int(mode)
message = {
'sequence-numbers': types.AMQPArray([types.AMQPLong(s) for s in sequence_numbers]),
'receiver-settle-mode': types.AMQPuInt(receive_mode)
}
handler = functools.partial(mgmt_handlers.deferred_message_op, mode=receive_mode, message_type=DeferredMessage)
messages = await self._mgmt_request_response(
REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
message,
handler)
for m in messages:
m._receiver = self # pylint: disable=protected-access
return messages
[docs] async def fetch_next(self, max_batch_size=None, timeout=None):
"""Receive a batch of messages at once.
This approach it optimal if you wish to process multiple messages simultaneously.
Note that the number of messages retrieved in a single batch will be dependent on
whether `prefetch` was set for the receiver. This call will prioritize returning
quickly over meeting a specified batch size, and so will return as soon as at least
one message is received and there is a gap in incoming messages regardless
of the specified batch size.
:param max_batch_size: Maximum number of messages in the batch. Actual number
returned will depend on prefetch size and incoming stream rate.
:type max_batch_size: int
:param timeout: The time to wait in seconds for the first message to arrive.
If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. If specified, an no messages arrive within the
timeout period, an empty list will be returned.
:rtype: list[~azure.servicebus.aio.async_message.Message]
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START receiver_fetch_batch]
:end-before: [END receiver_fetch_batch]
:language: python
:dedent: 4
:caption: Fetch a batch of messages.
"""
await self._can_run()
wrapped_batch = []
max_batch_size = max_batch_size or self._handler._prefetch # pylint: disable=protected-access
try:
timeout_ms = 1000 * timeout if timeout else 0
batch = await self._handler.receive_message_batch_async(
max_batch_size=max_batch_size,
timeout=timeout_ms)
for received in batch:
message = self._build_message(received)
wrapped_batch.append(message)
except Exception as e: # pylint: disable=broad-except
await self._handle_exception(e)
return wrapped_batch
[docs]class SessionReceiver(Receiver, mixins.SessionMixin):
"""A session message receiver.
This receive handler acts as an iterable message stream for retrieving
messages for a sessionful Service Bus entity. It operates a single connection that must be opened and
closed on completion. The service connection will remain open for the entirety of the iterator.
If you find yourself only partially iterating the message stream, you should run the receiver
in a `with` statement to ensure the connection is closed.
The Receiver should not be instantiated directly, and should be accessed from a `QueueClient` or
`SubscriptionClient` using the `get_receiver()` method.
When receiving messages from a session, connection errors that would normally be automatically
retried will instead raise an error due to the loss of the lock on a particular session.
A specific session can be specified, or the receiver can retrieve any available session using
the `NEXT_AVAILABLE` constant.
.. note:: This object is not thread-safe.
:param handler_id: The ID used as the connection name for the Receiver.
:type handler_id: str
:param source: The endpoint from which to receive messages.
:type source: ~uamqp.Source
:param auth_config: The SASL auth credentials.
:type auth_config: dict[str, str]
:param session: The ID of the session to receive from.
:type session: str or ~azure.servicebus.common.constants.NEXT_AVAILABLE
:param loop: An async event loop
:type loop: ~asyncio.EventLoop
:param connection: A shared connection [not yet supported].
:type connection: ~uamqp.Connection
:param mode: The receive connection mode. Value must be either PeekLock or ReceiveAndDelete.
:type mode: ~azure.servicebus.common.constants.ReceiveSettleMode
:param encoding: The encoding used for string properties. Default is 'UTF-8'.
:type encoding: str
:param debug: Whether to enable network trace debug logs.
:type debug: bool
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START open_close_receiver_session_context]
:end-before: [END open_close_receiver_session_context]
:language: python
:dedent: 4
:caption: Running a session receiver within a context manager.
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START open_close_receiver_session_nextavailable]
:end-before: [END open_close_receiver_session_nextavailable]
:language: python
:dedent: 4
:caption: Running a session receiver for the next available session.
"""
def __init__(
self, handler_id, source, auth_config, *, session=None, loop=None,
connection=None, encoding='UTF-8', debug=False, **kwargs):
self.session_id = None
self.session_filter = session
self.locked_until = None
self.session_start = None
self.auto_reconnect = False
self.auto_renew_error = None
super(SessionReceiver, self).__init__(
handler_id, source, auth_config, loop=loop,
connection=connection, encoding=encoding, debug=debug, **kwargs)
def _build_handler(self):
auth = None if self.connection else authentication.SASTokenAsync.from_shared_access_key(**self.auth_config)
self._handler = ReceiveClientAsync(
self._get_source(),
auth=auth,
debug=self.debug,
properties=self.properties,
error_policy=self.error_policy,
client_name=self.name,
on_attach=self._on_attach,
auto_complete=False,
encoding=self.encoding,
loop=self.loop,
**self.handler_kwargs)
async def _can_run(self):
await super(SessionReceiver, self)._can_run()
if self.expired:
raise SessionLockExpired(inner_exception=self.auto_renew_error)
async def _handle_exception(self, exception):
if isinstance(exception, errors.LinkDetach) and exception.condition == SESSION_LOCK_LOST:
error = SessionLockExpired("Connection detached - lock on Session {} lost.".format(self.session_id))
await self.close(exception=error)
raise error
elif isinstance(exception, errors.LinkDetach) and exception.condition == SESSION_LOCK_TIMEOUT:
error = NoActiveSession("Queue has no active session to receive from.")
await self.close(exception=error)
raise error
return await super(SessionReceiver, self)._handle_exception(exception)
async def _settle_deferred(self, settlement, lock_tokens, dead_letter_details=None):
message = {
'disposition-status': settlement,
'lock-tokens': types.AMQPArray(lock_tokens),
'session-id': self.session_id}
if dead_letter_details:
message.update(dead_letter_details)
return await self._mgmt_request_response(
REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION,
message,
mgmt_handlers.default)
[docs] async def get_session_state(self):
"""Get the session state.
Returns None if no state has been set.
:rtype: str
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START set_session_state]
:end-before: [END set_session_state]
:language: python
:dedent: 4
:caption: Getting and setting the state of a session.
"""
await self._can_run()
response = await self._mgmt_request_response(
REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION,
{'session-id': self.session_id},
mgmt_handlers.default)
session_state = response.get(b'session-state')
if isinstance(session_state, six.binary_type):
session_state = session_state.decode('UTF-8')
return session_state
[docs] async def set_session_state(self, state):
"""Set the session state.
:param state: The state value.
:type state: str or bytes or bytearray
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START set_session_state]
:end-before: [END set_session_state]
:language: python
:dedent: 4
:caption: Getting and setting the state of a session.
"""
await self._can_run()
state = state.encode(self.encoding) if isinstance(state, six.text_type) else state
return await self._mgmt_request_response(
REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION,
{'session-id': self.session_id, 'session-state': bytearray(state)},
mgmt_handlers.default)
[docs] async def renew_lock(self):
"""Renew the session lock.
This operation must be performed periodically in order to retain a lock on the session
to continue message processing. Once the lock is lost the connection will be closed.
This operation can also be performed as an asynchronous background task by registering the session
with an `azure.servicebus.aio.AutoLockRenew` instance.
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START receiver_renew_session_lock]
:end-before: [END receiver_renew_session_lock]
:language: python
:dedent: 4
:caption: Renew the sesison lock.
"""
await self._can_run()
expiry = await self._mgmt_request_response(
REQUEST_RESPONSE_RENEW_SESSION_LOCK_OPERATION,
{'session-id': self.session_id},
mgmt_handlers.default)
self.locked_until = datetime.datetime.fromtimestamp(expiry[b'expiration']/1000.0)
[docs] async def peek(self, count=1, start_from=0):
"""Browse messages currently pending in the queue.
Peeked messages are not removed from queue, nor are they locked.
They cannot be completed, deferred or dead-lettered.
This operation will only peek pending messages in the current session.
:param count: The maximum number of messages to try and peek. The default
value is 1.
:type count: int
:param start_from: A message sequence number from which to start browsing messages.
:type start_from: int
:rtype: list[~azure.servicebus.common.message.PeekMessage]
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START receiver_peek_session_messages]
:end-before: [END receiver_peek_session_messages]
:language: python
:dedent: 8
:caption: Peek messages in the queue.
"""
if not start_from:
start_from = self.last_received or 1
if int(count) < 1:
raise ValueError("count must be 1 or greater.")
if int(start_from) < 1:
raise ValueError("start_from must be 1 or greater.")
await self._can_run()
message = {
'from-sequence-number': types.AMQPLong(start_from),
'message-count': count,
'session-id': self.session_id}
return await self._mgmt_request_response(
REQUEST_RESPONSE_PEEK_OPERATION,
message,
mgmt_handlers.peek_op)
[docs] async def receive_deferred_messages(self, sequence_numbers, mode=ReceiveSettleMode.PeekLock):
"""Receive messages that have previously been deferred.
This operation can only receive deferred messages from the current session.
When receiving deferred messages from a partitioned entity, all of the supplied
sequence numbers must be messages from the same partition.
:param sequence_numbers: A list of the sequence numbers of messages that have been
deferred.
:type sequence_numbers: list[int]
:param mode: The receive mode, default value is PeekLock.
:type mode: ~azure.servicebus.common.constants.ReceiveSettleMode
:rtype: list[~azure.servicebus.aio.async_message.DeferredMessage]
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START receiver_defer_session_messages]
:end-before: [END receiver_defer_session_messages]
:language: python
:dedent: 8
:caption: Defer messages, then retrieve them by sequence number.
"""
if not sequence_numbers:
raise ValueError("At least one sequence number must be specified.")
await self._can_run()
try:
receive_mode = mode.value.value
except AttributeError:
receive_mode = int(mode)
message = {
'sequence-numbers': types.AMQPArray([types.AMQPLong(s) for s in sequence_numbers]),
'receiver-settle-mode': types.AMQPuInt(receive_mode),
'session-id': self.session_id
}
handler = functools.partial(mgmt_handlers.deferred_message_op, mode=receive_mode, message_type=DeferredMessage)
messages = await self._mgmt_request_response(
REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
message,
handler)
for m in messages:
m._receiver = self # pylint: disable=protected-access
return messages
[docs] async def list_sessions(self, updated_since=None, max_results=100, skip=0):
"""List session IDs.
List the IDs of sessions in the queue with pending messages and where the state of the session
has been updated since the timestamp provided. If no timestamp is provided, all will be returned.
If the state of a session has never been set, it will not be returned regardless of whether
there are messages pending.
:param updated_since: The UTC datetime from which to return updated pending session IDs.
:type updated_since: ~datetime.datetime
:param max_results: The maximum number of session IDs to return. Default value is 100.
:type max_results: int
:param skip: The page value to jump to. Default value is 0.
:type skip: int
:rtype: list[str]
"""
if int(max_results) < 1:
raise ValueError("max_results must be 1 or greater.")
await self._can_run()
message = {
'last-updated-time': updated_since or datetime.datetime.utcfromtimestamp(0),
'skip': skip,
'top': max_results,
}
return await self._mgmt_request_response(
REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION,
message,
mgmt_handlers.list_sessions_op)