Interface ChangeFeedProcessor


  • public interface ChangeFeedProcessor
    Simple host for distributing change feed events across observers and thus allowing these observers scale. It distributes the load across its instances and allows dynamic scaling: - Partitions in partitioned collections are distributed across instances/observers. - New instance takes leases from existing instances to make distribution equal. - If an instance dies, the leases are distributed across remaining instances. It's useful for scenario when partition count is high so that one host/VM is not capable of processing that many change feed events. Client application needs to implement ChangeFeedObserver and register processor implementation with ChangeFeedProcessor.

    It uses auxiliary document collection for managing leases for a partition. Every EventProcessorHost instance is performing the following two tasks: 1) Renew Leases: It keeps track of leases currently owned by the host and continuously keeps on renewing the leases. 2) Acquire Leases: Each instance continuously polls all leases to check if there are any leases it should acquire for the system to get into balanced state.

    ChangeFeedProcessor changeFeedProcessor = ChangeFeedProcessor.Builder() .hostName(hostName) .feedContainer(feedContainer) .leaseContainer(leaseContainer) .handleChanges(docs -> { // Implementation for handling and processing CosmosItemProperties list goes here }) .build();

    • Method Detail

      • start

        Mono<Void> start()
        Start listening for changes asynchronously.
        Returns:
        a representation of the deferred computation of this call.
      • stop

        Mono<Void> stop()
        Stops listening for changes asynchronously.
        Returns:
        a representation of the deferred computation of this call.
      • isStarted

        boolean isStarted()
        Returns the state of the change feed processor.
        Returns:
        true if the change feed processor is currently active and running.
      • getEstimatedLag

        Mono<Map<String,​Integer>> getEstimatedLag()
        Returns the current owner (host) and an approximation of the difference between the last processed item (defined by the state of the feed container) and the latest change in the container for each partition (lease document).

        An empty map will be returned if the processor was not started or no lease documents matching the current ChangeFeedProcessor instance's lease prefix could be found.

        Returns:
        a map representing the current owner and lease token, the current LSN and latest LSN, and the estimated lag, asynchronously.
      • Builder

        static ChangeFeedProcessor.BuilderDefinition Builder()
        Helper static method to build ChangeFeedProcessor instances as logical representation of the Azure Cosmos DB database service.

        ChangeFeedProcessor.Builder() .hostName("SampleHost") .feedContainer(feedContainer) .leaseContainer(leaseContainer) .handleChanges(docs -> { // Implementation for handling and processing CosmosItemProperties list goes here }) .build();

        Returns:
        a builder definition instance.