Source code for azure.communication.chat._shared.user_credential

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from threading import Lock, Condition
from datetime import timedelta
from typing import ( # pylint: disable=unused-import
    cast,
    Tuple,
)

from .utils import get_current_utc_as_int
from .user_token_refresh_options import CommunicationTokenRefreshOptions


[docs]class CommunicationTokenCredential(object): """Credential type used for authenticating to an Azure Communication service. :param str token: The token used to authenticate to an Azure Communication service :keyword token_refresher: The token refresher to provide capacity to fetch fresh token :raises: TypeError """ _ON_DEMAND_REFRESHING_INTERVAL_MINUTES = 2 def __init__(self, token, # type: str **kwargs ): token_refresher = kwargs.pop('token_refresher', None) communication_token_refresh_options = CommunicationTokenRefreshOptions(token=token, token_refresher=token_refresher) self._token = communication_token_refresh_options.get_token() self._token_refresher = communication_token_refresh_options.get_token_refresher() self._lock = Condition(Lock()) self._some_thread_refreshing = False
[docs] def get_token(self, *scopes, **kwargs): # pylint: disable=unused-argument # type (*str, **Any) -> AccessToken """The value of the configured token. :rtype: ~azure.core.credentials.AccessToken """ if not self._token_refresher or not self._token_expiring(): return self._token should_this_thread_refresh = False with self._lock: while self._token_expiring(): if self._some_thread_refreshing: if self._is_currenttoken_valid(): return self._token self._wait_till_inprogress_thread_finish_refreshing() else: should_this_thread_refresh = True self._some_thread_refreshing = True break if should_this_thread_refresh: try: newtoken = self._token_refresher() # pylint:disable=not-callable with self._lock: self._token = newtoken self._some_thread_refreshing = False self._lock.notify_all() except: with self._lock: self._some_thread_refreshing = False self._lock.notify_all() raise return self._token
def _wait_till_inprogress_thread_finish_refreshing(self): self._lock.release() self._lock.acquire() def _token_expiring(self): return self._token.expires_on - get_current_utc_as_int() <\ timedelta(minutes=self._ON_DEMAND_REFRESHING_INTERVAL_MINUTES).total_seconds() def _is_currenttoken_valid(self): return get_current_utc_as_int() < self._token.expires_on