Source code for azure.servicebus._common.auto_lock_renewer

# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------

import datetime
import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
import queue
from typing import TYPE_CHECKING, Union, Optional, Any, Callable

from .._servicebus_receiver import ServiceBusReceiver
from .._servicebus_session import ServiceBusSession
from .message import ServiceBusReceivedMessage
from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError
from .utils import get_renewable_start_time, utc_now, get_renewable_lock_duration

if TYPE_CHECKING:

    Renewable = Union[ServiceBusSession, ServiceBusReceivedMessage]
    LockRenewFailureCallback = Callable[[Renewable, Optional[Exception]], None]

_log = logging.getLogger(__name__)

SHORT_RENEW_OFFSET = 0.5  # Seconds that if a renew period is longer than lock duration + offset, it's "too long"
SHORT_RENEW_SCALING_FACTOR = 0.75  # In this situation we need a "Short renew" and should scale by this factor.


[docs] class AutoLockRenewer(object): # pylint:disable=too-many-instance-attributes """Auto renew locks for messages and sessions using a background thread pool. :param max_lock_renewal_duration: A time in seconds that locks registered to this renewer should be maintained for. Default value is 300 (5 minutes). :type max_lock_renewal_duration: float :param on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback). :type on_lock_renew_failure: Optional[LockRenewFailureCallback] :param executor: A user-specified thread pool. This cannot be combined with setting `max_workers`. :type executor: Optional[~concurrent.futures.ThreadPoolExecutor] :param max_workers: Specify the maximum workers in the thread pool. If not specified the number used will be derived from the core count of the environment. This cannot be combined with `executor`. :type max_workers: Optional[int] .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py :start-after: [START auto_lock_renew_message_sync] :end-before: [END auto_lock_renew_message_sync] :language: python :dedent: 4 :caption: Automatically renew a message lock .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py :start-after: [START auto_lock_renew_session_sync] :end-before: [END auto_lock_renew_session_sync] :language: python :dedent: 4 :caption: Automatically renew a session lock """ def __init__( self, max_lock_renewal_duration: float = 300, on_lock_renew_failure: Optional["LockRenewFailureCallback"] = None, executor: Optional[ThreadPoolExecutor] = None, max_workers: Optional[int] = None, ) -> None: """Auto renew locks for messages and sessions using a background thread pool. It is recommended setting max_worker to a large number or passing ThreadPoolExecutor of large max_workers number when AutoLockRenewer is supposed to deal with multiple messages or sessions simultaneously. :param max_lock_renewal_duration: A time in seconds that locks registered to this renewer should be maintained for. Default value is 300 (5 minutes). :type max_lock_renewal_duration: float :param on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback). :type on_lock_renew_failure: Optional[LockRenewFailureCallback] :param executor: A user-specified thread pool. This cannot be combined with setting `max_workers`. :type executor: Optional[~concurrent.futures.ThreadPoolExecutor] :param max_workers: Specify the maximum workers in the thread pool. If not specified the number used will be derived from the core count of the environment. This cannot be combined with `executor`. :type max_workers: Optional[int] """ self._executor = executor or ThreadPoolExecutor(max_workers=max_workers) # None indicates it's unknown whether the provided executor has max workers > 1 self._is_max_workers_greater_than_one = None if executor else (max_workers is None or max_workers > 1) self._shutdown = threading.Event() self._sleep_time = 0.5 self._renew_period = 10 self._running_dispatcher = threading.Event() # indicate whether the dispatcher is running self._last_activity_timestamp = None # the last timestamp when the dispatcher is active dealing with tasks self._dispatcher_timeout = 5 # the idle time that dispatcher should exit if there's no activity self._max_lock_renewal_duration = max_lock_renewal_duration self._on_lock_renew_failure = on_lock_renew_failure self._renew_tasks = queue.Queue() # type: ignore self._infer_max_workers_time = 1 def __enter__(self) -> "AutoLockRenewer": if self._shutdown.is_set(): raise ServiceBusError( "The AutoLockRenewer has already been shutdown. Please create a new instance for auto lock renewing." ) self._init_workers() return self def __exit__(self, *args: Any) -> None: self.close() def _init_workers(self): if not self._running_dispatcher.is_set(): self._infer_max_workers_greater_than_one_if_needed() self._running_dispatcher.set() self._executor.submit(self._dispatch_worker) def _infer_max_workers_greater_than_one_if_needed(self): # infer max_workers value if executor is passed in if self._is_max_workers_greater_than_one is None: max_wokers_checker = self._executor.submit(self._infer_max_workers_value_worker) max_wokers_checker.result() def _infer_max_workers_value_worker(self): max_workers_checker = self._executor.submit(pow, 1, 1) # This will never complete because there is only one worker thread and # it is executing this function. try: max_workers_checker.result(timeout=self._infer_max_workers_time) self._is_max_workers_greater_than_one = True except FuturesTimeoutError: self._is_max_workers_greater_than_one = False def _renewable(self, renewable): # pylint: disable=protected-access if self._shutdown.is_set(): return False if hasattr(renewable, "_settled") and renewable._settled: return False if not renewable._receiver._running: return False if renewable._lock_expired: return False return True def _dispatch_worker(self): self._last_activity_timestamp = time.time() while not self._shutdown.is_set() and self._running_dispatcher.is_set(): while not self._renew_tasks.empty(): renew_task = self._renew_tasks.get() if self._is_max_workers_greater_than_one: self._executor.submit(self._auto_lock_renew_task, *renew_task) else: self._auto_lock_renew_task(*renew_task) self._renew_tasks.task_done() self._last_activity_timestamp = time.time() # If there's no activity in the past self._idle_timeout seconds, exit the method # This ensures the dispatching thread could exit, not blocking the main python thread # the main worker thread could be started again if new tasks get registered if time.time() - self._last_activity_timestamp >= self._dispatcher_timeout: self._running_dispatcher.clear() self._last_activity_timestamp = None return time.sleep(self._sleep_time) # save cpu cycles if there's currently no task in self._renew_tasks def _auto_lock_renew_task( self, receiver, renewable, starttime, max_lock_renewal_duration, on_lock_renew_failure=None, renew_period_override=None, ): # pylint: disable=protected-access error = None clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) renew_period = renew_period_override or self._renew_period try: if self._renewable(renewable): if (utc_now() - starttime) >= datetime.timedelta(seconds=max_lock_renewal_duration): _log.debug("Reached max auto lock renew duration - letting lock expire.") raise AutoLockRenewTimeout( "Auto-renew period ({} seconds) elapsed.".format(max_lock_renewal_duration) ) if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=renew_period): _log.debug( "%r seconds or less until lock expires - auto renewing.", renew_period, ) try: # Renewable is a session renewable.renew_lock() # type: ignore except AttributeError: # Renewable is a message receiver.renew_message_lock(renewable) # type: ignore time.sleep(self._sleep_time) # enqueue a new task, keeping renewing the renewable if self._renewable(renewable): self._renew_tasks.put( ( receiver, renewable, starttime, max_lock_renewal_duration, on_lock_renew_failure, renew_period_override, ) ) clean_shutdown = not renewable._lock_expired except AutoLockRenewTimeout as e: error = e renewable.auto_renew_error = e clean_shutdown = not renewable._lock_expired except Exception as e: # pylint: disable=broad-except _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) error = AutoLockRenewFailed("Failed to auto-renew lock", error=e) renewable.auto_renew_error = error finally: if on_lock_renew_failure and not clean_shutdown: on_lock_renew_failure(renewable, error)
[docs] def register( self, receiver: ServiceBusReceiver, renewable: Union[ServiceBusReceivedMessage, ServiceBusSession], max_lock_renewal_duration: Optional[float] = None, on_lock_renew_failure: Optional["LockRenewFailureCallback"] = None, ) -> None: """Register a renewable entity for automatic lock renewal. :param receiver: The ServiceBusReceiver instance that is associated with the message or the session to be auto-lock-renewed. :type receiver: ~azure.servicebus.ServiceBusReceiver :param renewable: A locked entity that needs to be renewed. :type renewable: Union[~azure.servicebus.ServiceBusReceivedMessage, ~azure.servicebus.ServiceBusSession] :param max_lock_renewal_duration: A time in seconds that the lock should be maintained for. Default value is None. If specified, this value will override the default value specified at the constructor. :type max_lock_renewal_duration: Optional[float] :param on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback). :type on_lock_renew_failure: Optional[LockRenewFailureCallback] :rtype: None """ if not isinstance(renewable, (ServiceBusReceivedMessage, ServiceBusSession)): raise TypeError( "AutoLockRenewer only supports registration of types " "azure.servicebus.ServiceBusReceivedMessage (via a receiver's receive methods) and " "azure.servicebus.ServiceBusSession (via a session receiver's property receiver.session)." ) if self._shutdown.is_set(): raise ServiceBusError( "The AutoLockRenewer has already been shutdown. Please create a new instance for auto lock renewing." ) if renewable.locked_until_utc is None: raise ValueError( "Only azure.servicebus.ServiceBusReceivedMessage objects in PEEK_LOCK receive mode may" "be lock-renewed. (E.g. only messages received via receive() or the receiver iterator," "not using RECEIVE_AND_DELETE receive mode, and not returned from Peek)" ) starttime = get_renewable_start_time(renewable) # This is a heuristic to compensate if it appears the user has a lock duration less than our base renew period time_until_expiry = get_renewable_lock_duration(renewable) renew_period_override = None # Default is 10 seconds, but let's leave ourselves a small margin of error because clock skew is a real problem if time_until_expiry <= datetime.timedelta(seconds=self._renew_period + SHORT_RENEW_OFFSET): renew_period_override = time_until_expiry.seconds * SHORT_RENEW_SCALING_FACTOR _log.debug( "Running lock auto-renew for %r for %r seconds", renewable, max_lock_renewal_duration or self._max_lock_renewal_duration, ) self._init_workers() self._renew_tasks.put( ( receiver, renewable, starttime, max_lock_renewal_duration or self._max_lock_renewal_duration, on_lock_renew_failure or self._on_lock_renew_failure, renew_period_override, ) )
[docs] def close(self, wait: bool = True) -> None: """Cease autorenewal by shutting down the thread pool to clean up any remaining lock renewal threads. :param wait: Whether to block until thread pool has shutdown. Default is `True`. :type wait: bool :rtype: None """ self._running_dispatcher.clear() self._shutdown.set() self._executor.shutdown(wait=wait)