Class ChangeFeedProcessorBuilder

java.lang.Object
com.azure.cosmos.ChangeFeedProcessorBuilder

public class ChangeFeedProcessorBuilder extends Object
Helper class to build a ChangeFeedProcessor instance. Below is an example of building ChangeFeedProcessor for LatestVersion mode.
 ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
     .hostName(hostName)
     .feedContainer(feedContainer)
     .leaseContainer(leaseContainer)
     .handleChanges(docs -> {
         for (JsonNode item : docs) {
             // Implementation for handling and processing of each JsonNode item goes here
         }
     })
     .buildChangeFeedProcessor();
 
Below is an example of building ChangeFeedProcessor with throughput control for handleChanges.
 ThroughputControlGroupConfig throughputControlGroupConfig =
         new ThroughputControlGroupConfigBuilder()
                 .groupName("cfp")
                 .targetThroughput(300)
                 .priorityLevel(PriorityLevel.LOW)
                 .build();
 ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
         .hostName(hostName)
         .feedContainer(feedContainer)
         .leaseContainer(leaseContainer)
         .handleChanges(docs -> {
             for (JsonNode item : docs) {
                 // Implementation for handling and processing of each JsonNode item goes here
             }
         })
         .options(
                 new ChangeFeedProcessorOptions()
                         .setFeedPollThroughputControlConfig(throughputControlGroupConfig)
         )
         .buildChangeFeedProcessor();
 
Below is an example of building ChangeFeedProcessor with throughput control for LatestVersion mode.
 ThroughputControlGroupConfig throughputControlGroupConfig =
         new ThroughputControlGroupConfigBuilder()
                 .groupName("cfp")
                 .targetThroughput(300)
                 .priorityLevel(PriorityLevel.LOW)
                 .build();
 ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
         .hostName(hostName)
         .feedContainer(feedContainer)
         .leaseContainer(leaseContainer)
         .handleLatestVersionChanges(changeFeedProcessorItems -> {
             for (ChangeFeedProcessorItem item : changeFeedProcessorItems) {
                 // Implementation for handling and processing of each change feed item goes here
             }
         })
         .options(
                 new ChangeFeedProcessorOptions()
                         .setFeedPollThroughputControlConfig(throughputControlGroupConfig)
         )
         .buildChangeFeedProcessor();
 
Below is an example of building ChangeFeedProcessor for AllVersionsAndDeletes mode.
 ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
     .hostName(hostName)
     .feedContainer(feedContainer)
     .leaseContainer(leaseContainer)
     .handleAllVersionsAndDeletesChanges(docs -> {
         for (ChangeFeedProcessorItem item : docs) {
             // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
         }
     })
     .buildChangeFeedProcessor();
 
Below is an example of building ChangeFeedProcessor for AllVersionsAndDeletes mode when also wishing to process a ChangeFeedProcessorContext.
 ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
     .hostName(hostName)
     .feedContainer(feedContainer)
     .leaseContainer(leaseContainer)
     .handleAllVersionsAndDeletesChanges((docs, context) -> {
         for (ChangeFeedProcessorItem item : docs) {
             // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
         }
         String leaseToken = context.getLeaseToken();
         // Handling of the lease token corresponding to a batch of change feed processor item goes here
     })
     .buildChangeFeedProcessor();
 
  • Constructor Details

    • ChangeFeedProcessorBuilder

      public ChangeFeedProcessorBuilder()
      Instantiates a new Cosmos a new ChangeFeedProcessor builder.
  • Method Details

    • hostName

      public ChangeFeedProcessorBuilder hostName(String hostName)
      Sets the host name.
      Parameters:
      hostName - the name to be used for the host. When using multiple hosts, each host must have a unique name.
      Returns:
      current Builder.
    • feedContainer

      public ChangeFeedProcessorBuilder feedContainer(CosmosAsyncContainer feedContainer)
      Sets and existing CosmosAsyncContainer to be used to read from the monitored container.
      Parameters:
      feedContainer - the instance of CosmosAsyncContainer to be used.
      Returns:
      current Builder.
    • leaseContainer

      public ChangeFeedProcessorBuilder leaseContainer(CosmosAsyncContainer leaseContainer)
      Sets an existing CosmosAsyncContainer to be used to read from the leases container.
      Parameters:
      leaseContainer - the instance of CosmosAsyncContainer to use.
      Returns:
      current Builder.
    • handleChanges

      public ChangeFeedProcessorBuilder handleChanges(Consumer<List<JsonNode>> consumer)
      Sets a consumer function which will be called to process changes for LatestVersion change feed mode. Attention! This API is not merge proof, please use handleLatestVersionChanges(Consumer) instead.
       .handleChanges(docs -> {
           for (JsonNode item : docs) {
               // Implementation for handling and processing of each JsonNode item goes here
           }
       })
       
      Parameters:
      consumer - the Consumer to call for handling the feeds.
      Returns:
      current Builder.
    • handleLatestVersionChanges

      public ChangeFeedProcessorBuilder handleLatestVersionChanges(Consumer<List<ChangeFeedProcessorItem>> consumer)
      Sets a consumer function which will be called to process changes for LatestVersion change feed mode.
       .handleLatestVersionChanges(changeFeedProcessorItems -> {
           for (ChangeFeedProcessorItem item : changeFeedProcessorItems) {
               // Implementation for handling and processing of each change feed item goes here
           }
       })
       
      Parameters:
      consumer - the Consumer to call for handling the feeds.
      Returns:
      current Builder.
    • handleAllVersionsAndDeletesChanges

      @Beta(value=V4_37_0, warningText="Preview API - subject to change in non-backwards compatible way") public ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(Consumer<List<ChangeFeedProcessorItem>> consumer)
      Sets a consumer function which will be called to process changes for AllVersionsAndDeletes change feed mode.
       .handleAllVersionsAndDeletesChanges(docs -> {
           for (ChangeFeedProcessorItem item : docs) {
               // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
           }
       })
       
      Parameters:
      consumer - the Consumer to call for handling the feeds.
      Returns:
      current Builder.
    • handleAllVersionsAndDeletesChanges

      @Beta(value=V4_51_0, warningText="Preview API - subject to change in non-backwards compatible way") public ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(BiConsumer<List<ChangeFeedProcessorItem>,ChangeFeedProcessorContext> biConsumer)
      Sets a BiConsumer function which will be called to process changes for AllVersionsAndDeletes change feed mode.
       .handleAllVersionsAndDeletesChanges((docs, context) -> {
           for (ChangeFeedProcessorItem item : docs) {
               // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
           }
           String leaseToken = context.getLeaseToken();
           // Handling of the lease token corresponding to a batch of change feed processor item goes here
       })
       
      Parameters:
      biConsumer - the BiConsumer to call for handling the feeds and the ChangeFeedProcessorContext instance.
      Returns:
      current Builder.
    • options

      public ChangeFeedProcessorBuilder options(ChangeFeedProcessorOptions changeFeedProcessorOptions)
      Sets the ChangeFeedProcessorOptions to be used. Unless specifically set the default values that will be used are:
      • maximum items per page or FeedResponse: 100
      • lease renew interval: 17 seconds
      • lease acquire interval: 13 seconds
      • lease expiration interval: 60 seconds
      • feed poll delay: 5 seconds
      • maximum scale count: unlimited
      Parameters:
      changeFeedProcessorOptions - the change feed processor options to use.
      Returns:
      current Builder.
    • buildChangeFeedProcessor

      public ChangeFeedProcessor buildChangeFeedProcessor()
      Builds a new instance of the ChangeFeedProcessor with the specified configuration.
      Returns:
      an instance of ChangeFeedProcessor.