Source code for azure.eventhub.extensions.checkpointstoreblob._blobstoragecs

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Dict, Optional, Any
import logging
import time
import calendar
from datetime import datetime
from collections import defaultdict

from azure.eventhub import CheckpointStore  # type: ignore  # pylint: disable=no-name-in-module
from azure.eventhub.exceptions import OwnershipLostError  # type: ignore
from azure.core.exceptions import ResourceModifiedError, ResourceExistsError  # type: ignore
from ._vendor.storage.blob import BlobClient, ContainerClient
from ._vendor.storage.blob._shared.base_client import parse_connection_str

logger = logging.getLogger(__name__)
UPLOAD_DATA = ""


def _utc_to_local(utc_dt):
    timestamp = calendar.timegm(utc_dt.timetuple())
    local_dt = datetime.fromtimestamp(timestamp)
    return local_dt.replace(microsecond=utc_dt.microsecond)


def _to_timestamp(date):
    timestamp = None
    if not date:
        return timestamp
    try:
        timestamp = date.timestamp()
    except AttributeError:  # python2.7 compatible
        timestamp = time.mktime(_utc_to_local(date).timetuple())
        timestamp += date.microsecond / 1e6
    return timestamp


[docs]class BlobCheckpointStore(CheckpointStore): """A CheckpointStore that uses Azure Blob Storage to store the partition ownership and checkpoint data. This class implements methods list_ownership, claim_ownership, update_checkpoint and list_checkpoints that are defined in class azure.eventhub.aio.CheckpointStore of package azure-eventhub. :param str account_url: The URI to the storage account. In order to create a client given the full URI to the container, use the :func:`from_container_url` classmethod. :param container_name: The name of the container for the blob. :type container_name: str :param credential: The credentials with which to authenticate. This is optional if the account URL already has a SAS token. The value can be a SAS token string, an account shared access key, or an instance of a TokenCredentials class from azure.identity. If the URL already has a SAS token, specifying an explicit credential will take priority. :keyword str api_version: The Storage API version to use for requests. Default value is '2019-07-07'. :keyword str secondary_hostname: The hostname of the secondary endpoint. """ def __init__(self, blob_account_url, container_name, credential=None, **kwargs): # type(str, str, Optional[Any], Any) -> None self._container_client = kwargs.pop("container_client", None) if not self._container_client: api_version = kwargs.pop("api_version", None) if api_version: headers = kwargs.get("headers") if headers: headers["x-ms-version"] = api_version else: kwargs["headers"] = {"x-ms-version": api_version} self._container_client = ContainerClient( blob_account_url, container_name, credential=credential, **kwargs ) self._cached_blob_clients = defaultdict() # type: Dict[str, BlobClient]
[docs] @classmethod def from_connection_string( cls, conn_str, container_name, credential=None, **kwargs ): # type: (str, str, Optional[Any], Any) -> BlobCheckpointStore """Create BlobCheckpointStore from a storage connection string. :param str conn_str: A connection string to an Azure Storage account. :param container_name: The container name for the blob. :type container_name: str :param credential: The credentials with which to authenticate. This is optional if the account URL already has a SAS token, or the connection string already has shared access key values. The value can be a SAS token string, an account shared access key, or an instance of a TokenCredentials class from azure.identity. Credentials provided here will take precedence over those in the connection string. :keyword str api_version: The Storage API version to use for requests. Default value is '2019-07-07'. :keyword str secondary_hostname: The hostname of the secondary endpoint. :returns: A blob checkpoint store. :rtype: ~azure.eventhub.extensions.checkpointstoreblob.BlobCheckpointStore """ account_url, secondary, credential = parse_connection_str(conn_str, credential, 'blob') if 'secondary_hostname' not in kwargs: kwargs['secondary_hostname'] = secondary return cls(account_url, container_name, credential=credential, **kwargs)
def __enter__(self): self._container_client.__enter__() return self def __exit__(self, *args): self._container_client.__exit__(*args) def _get_blob_client(self, blob_name): result = self._cached_blob_clients.get(blob_name) if not result: result = self._container_client.get_blob_client(blob_name) self._cached_blob_clients[blob_name] = result return result def _upload_ownership(self, ownership, metadata): etag = ownership.get("etag") if etag: etag_match = {"if_match": etag} else: etag_match = {"if_none_match": "*"} blob_name = "{}/{}/{}/ownership/{}".format( ownership["fully_qualified_namespace"], ownership["eventhub_name"], ownership["consumer_group"], ownership["partition_id"], ) blob_name = blob_name.lower() uploaded_blob_properties = self._get_blob_client(blob_name).upload_blob( data=UPLOAD_DATA, overwrite=True, metadata=metadata, **etag_match ) ownership["etag"] = uploaded_blob_properties["etag"] ownership["last_modified_time"] = _to_timestamp( uploaded_blob_properties["last_modified"] ) ownership.update(metadata)
[docs] def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_group): try: blob_prefix = "{}/{}/{}/ownership".format( fully_qualified_namespace, eventhub_name, consumer_group ) blobs = self._container_client.list_blobs( name_starts_with=blob_prefix.lower(), include=["metadata"] ) result = [] for blob in blobs: ownership = { "fully_qualified_namespace": fully_qualified_namespace, "eventhub_name": eventhub_name, "consumer_group": consumer_group, "partition_id": blob.name.split("/")[-1], "owner_id": blob.metadata["ownerid"], "etag": blob.etag, "last_modified_time": _to_timestamp(blob.last_modified), } result.append(ownership) return result except Exception as error: # pylint:disable=broad-except logger.warning( "An exception occurred during list_ownership for " "namespace %r eventhub %r consumer group %r. " "Exception is %r", fully_qualified_namespace, eventhub_name, consumer_group, error, ) raise
def _claim_one_partition(self, ownership): partition_id = ownership["partition_id"] fully_qualified_namespace = ownership["fully_qualified_namespace"] eventhub_name = ownership["eventhub_name"] consumer_group = ownership["consumer_group"] owner_id = ownership["owner_id"] metadata = {"ownerid": owner_id} try: self._upload_ownership(ownership, metadata) return ownership except (ResourceModifiedError, ResourceExistsError): logger.info( "EventProcessor instance %r of namespace %r eventhub %r consumer group %r " "lost ownership to partition %r", owner_id, fully_qualified_namespace, eventhub_name, consumer_group, partition_id, ) raise OwnershipLostError() except Exception as error: # pylint:disable=broad-except logger.warning( "An exception occurred when EventProcessor instance %r claim_ownership for " "namespace %r eventhub %r consumer group %r partition %r. " "The ownership is now lost. Exception " "is %r", owner_id, fully_qualified_namespace, eventhub_name, consumer_group, partition_id, error, ) return ownership # Keep the ownership if an unexpected error happens
[docs] def claim_ownership(self, ownership_list): gathered_results = [] for x in ownership_list: try: gathered_results.append(self._claim_one_partition(x)) except OwnershipLostError: pass return gathered_results
[docs] def update_checkpoint(self, checkpoint): metadata = { "offset": str(checkpoint["offset"]), "sequencenumber": str(checkpoint["sequence_number"]), } blob_name = "{}/{}/{}/checkpoint/{}".format( checkpoint["fully_qualified_namespace"], checkpoint["eventhub_name"], checkpoint["consumer_group"], checkpoint["partition_id"], ) blob_name = blob_name.lower() self._get_blob_client(blob_name).upload_blob( data=UPLOAD_DATA, overwrite=True, metadata=metadata )
[docs] def list_checkpoints( self, fully_qualified_namespace, eventhub_name, consumer_group ): blob_prefix = "{}/{}/{}/checkpoint".format( fully_qualified_namespace, eventhub_name, consumer_group ) blobs = self._container_client.list_blobs( name_starts_with=blob_prefix.lower(), include=["metadata"] ) result = [] for b in blobs: metadata = b.metadata checkpoint = { "fully_qualified_namespace": fully_qualified_namespace, "eventhub_name": eventhub_name, "consumer_group": consumer_group, "partition_id": b.name.split("/")[-1], "offset": str(metadata["offset"]), "sequence_number": int(metadata["sequencenumber"]), } result.append(checkpoint) return result
[docs] def close(self): self._container_client.__exit__()