Source code for azure.servicebus.aio._async_auto_lock_renewer

# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------

import asyncio
import logging
import datetime
from typing import Optional, Iterable, Any, Union, Callable, Awaitable, List

from .._common.message import ServiceBusReceivedMessage
from ._servicebus_session_async import ServiceBusSession
from ._servicebus_receiver_async import ServiceBusReceiver
from .._common.utils import (
    get_renewable_start_time,
    utc_now,
    get_renewable_lock_duration,
)
from .._common.auto_lock_renewer import SHORT_RENEW_OFFSET, SHORT_RENEW_SCALING_FACTOR
from ._async_utils import get_dict_with_loop_if_needed
from ..exceptions import AutoLockRenewTimeout, AutoLockRenewFailed, ServiceBusError

Renewable = Union[ServiceBusSession, ServiceBusReceivedMessage]
AsyncLockRenewFailureCallback = Callable[[Renewable, Optional[Exception]], Awaitable[None]]

_log = logging.getLogger(__name__)


[docs] class AutoLockRenewer: """Auto lock renew. An asynchronous AutoLockRenewer handler for renewing the lock tokens of messages and/or sessions in the background. :param max_lock_renewal_duration: A time in seconds that locks registered to this renewer should be maintained for. Default value is 300 (5 minutes). :type max_lock_renewal_duration: float :param on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback). :type on_lock_renew_failure: Optional[LockRenewFailureCallback] .. admonition:: Example: .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py :start-after: [START auto_lock_renew_message_async] :end-before: [END auto_lock_renew_message_async] :language: python :dedent: 4 :caption: Automatically renew a message lock .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py :start-after: [START auto_lock_renew_session_async] :end-before: [END auto_lock_renew_session_async] :language: python :dedent: 4 :caption: Automatically renew a session lock """ def __init__( self, max_lock_renewal_duration: float = 300, on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None, loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: self._internal_kwargs = get_dict_with_loop_if_needed(loop) self._shutdown = asyncio.Event() self._futures = [] # type: List[asyncio.Future] self._sleep_time = 1 self._renew_period = 10 self._on_lock_renew_failure = on_lock_renew_failure self._max_lock_renewal_duration = max_lock_renewal_duration async def __aenter__(self) -> "AutoLockRenewer": if self._shutdown.is_set(): raise ServiceBusError( "The AutoLockRenewer has already been shutdown. Please create a new instance for auto lock renewing." ) return self async def __aexit__(self, *args: Iterable[Any]) -> None: await self.close() def _renewable(self, renewable: Union[ServiceBusReceivedMessage, ServiceBusSession]) -> bool: # pylint: disable=protected-access if self._shutdown.is_set(): return False if hasattr(renewable, "_settled") and renewable._settled: # type: ignore return False if renewable._lock_expired: return False try: if not renewable._receiver._running: # type: ignore return False except AttributeError: # If for whatever reason the renewable isn't hooked up to a receiver raise ServiceBusError( "Cannot renew an entity without an associated receiver. " "ServiceBusReceivedMessage and active ServiceBusReceiver.Session objects are expected." ) from None return True async def _auto_lock_renew( self, receiver: ServiceBusReceiver, renewable: Renewable, starttime: datetime.datetime, max_lock_renewal_duration: float, on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None, renew_period_override: Optional[float] = None, ) -> None: # pylint: disable=protected-access _log.debug("Running async lock auto-renew for %r seconds", max_lock_renewal_duration) error = None # type: Optional[Exception] clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) renew_period = renew_period_override or self._renew_period try: while self._renewable(renewable): if (utc_now() - starttime) >= datetime.timedelta(seconds=max_lock_renewal_duration): _log.debug("Reached max auto lock renew duration - letting lock expire.") raise AutoLockRenewTimeout( "Auto-renew period ({} seconds) elapsed.".format(max_lock_renewal_duration) ) if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=renew_period): _log.debug( "%r seconds or less until lock expires - auto renewing.", renew_period, ) try: # Renewable is a session await renewable.renew_lock() # type: ignore except AttributeError: # Renewable is a message await receiver.renew_message_lock(renewable) # type: ignore await asyncio.sleep(self._sleep_time) clean_shutdown = not renewable._lock_expired except AutoLockRenewTimeout as e: error = e renewable.auto_renew_error = e clean_shutdown = not renewable._lock_expired except Exception as e: # pylint: disable=broad-except _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) error = AutoLockRenewFailed("Failed to auto-renew lock", error=e) renewable.auto_renew_error = error finally: if on_lock_renew_failure and not clean_shutdown: await on_lock_renew_failure(renewable, error)
[docs] def register( self, receiver: ServiceBusReceiver, renewable: Union[ServiceBusReceivedMessage, ServiceBusSession], max_lock_renewal_duration: Optional[float] = None, on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None, ) -> None: """Register a renewable entity for automatic lock renewal. :param receiver: The ServiceBusReceiver instance that is associated with the message or the session to be auto-lock-renewed. :type receiver: ~azure.servicebus.aio.ServiceBusReceiver :param renewable: A locked entity that needs to be renewed. :type renewable: Union[~azure.servicebus.aio.ServiceBusReceivedMessage,~azure.servicebus.aio.ServiceBusSession] :param max_lock_renewal_duration: A time in seconds that the lock should be maintained for. Default value is None. If specified, this value will override the default value specified at the constructor. :type max_lock_renewal_duration: Optional[float] :param Optional[AsyncLockRenewFailureCallback] on_lock_renew_failure: An async callback may be specified to be called when the lock is lost on the renewable being registered. Default value is None (no callback). :rtype: None """ if not isinstance(renewable, (ServiceBusReceivedMessage, ServiceBusSession)): raise TypeError( "AutoLockRenewer only supports registration of types " "azure.servicebus.ServiceBusReceivedMessage (via a receiver's receive methods) and " "azure.servicebus.aio.ServiceBusSession " "(via a session receiver's property receiver.session)." ) if self._shutdown.is_set(): raise ServiceBusError( "The AutoLockRenewer has already been shutdown. Please create a new instance for auto lock renewing." ) if renewable.locked_until_utc is None: raise ValueError( "Only azure.servicebus.ServiceBusReceivedMessage objects in PEEK_LOCK receive mode may" "be lock-renewed. (E.g. only messages received via receive() or the receiver iterator," "not using RECEIVE_AND_DELETE receive mode, and not returned from Peek)" ) starttime = get_renewable_start_time(renewable) # This is a heuristic to compensate if it appears the user has a lock duration less than our base renew period time_until_expiry = get_renewable_lock_duration(renewable) renew_period_override = None # Default is 10 seconds, but let's leave ourselves a small margin of error because clock skew is a real problem if time_until_expiry <= datetime.timedelta(seconds=self._renew_period + SHORT_RENEW_OFFSET): renew_period_override = time_until_expiry.seconds * SHORT_RENEW_SCALING_FACTOR renew_future = asyncio.ensure_future( self._auto_lock_renew( receiver, renewable, starttime, max_lock_renewal_duration or self._max_lock_renewal_duration, on_lock_renew_failure or self._on_lock_renew_failure, renew_period_override, ), **self._internal_kwargs ) self._futures.append(renew_future)
[docs] async def close(self) -> None: """Cease autorenewal by cancelling any remaining open lock renewal futures.""" self._shutdown.set() if self._futures: await asyncio.wait(self._futures)