Package com.azure.cosmos
Class ChangeFeedProcessorBuilder
java.lang.Object
com.azure.cosmos.ChangeFeedProcessorBuilder
Helper class to build a
ChangeFeedProcessor
instance.
Below is an example of building ChangeFeedProcessor for LatestVersion mode.
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) .feedContainer(feedContainer) .leaseContainer(leaseContainer) .handleChanges(docs -> { for (JsonNode item : docs) { // Implementation for handling and processing of each JsonNode item goes here } }) .buildChangeFeedProcessor();Below is an example of building ChangeFeedProcessor with throughput control for handleChanges.
ThroughputControlGroupConfig throughputControlGroupConfig = new ThroughputControlGroupConfigBuilder() .groupName("cfp") .targetThroughput(300) .priorityLevel(PriorityLevel.LOW) .build(); ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) .feedContainer(feedContainer) .leaseContainer(leaseContainer) .handleChanges(docs -> { for (JsonNode item : docs) { // Implementation for handling and processing of each JsonNode item goes here } }) .options( new ChangeFeedProcessorOptions() .setFeedPollThroughputControlConfig(throughputControlGroupConfig) ) .buildChangeFeedProcessor();Below is an example of building ChangeFeedProcessor with throughput control for LatestVersion mode.
ThroughputControlGroupConfig throughputControlGroupConfig = new ThroughputControlGroupConfigBuilder() .groupName("cfp") .targetThroughput(300) .priorityLevel(PriorityLevel.LOW) .build(); ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) .feedContainer(feedContainer) .leaseContainer(leaseContainer) .handleLatestVersionChanges(changeFeedProcessorItems -> { for (ChangeFeedProcessorItem item : changeFeedProcessorItems) { // Implementation for handling and processing of each change feed item goes here } }) .options( new ChangeFeedProcessorOptions() .setFeedPollThroughputControlConfig(throughputControlGroupConfig) ) .buildChangeFeedProcessor();Below is an example of building ChangeFeedProcessor for AllVersionsAndDeletes mode.
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) .feedContainer(feedContainer) .leaseContainer(leaseContainer) .handleAllVersionsAndDeletesChanges(docs -> { for (ChangeFeedProcessorItem item : docs) { // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here } }) .buildChangeFeedProcessor();Below is an example of building ChangeFeedProcessor for AllVersionsAndDeletes mode when also wishing to process a
ChangeFeedProcessorContext
.
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) .feedContainer(feedContainer) .leaseContainer(leaseContainer) .handleAllVersionsAndDeletesChanges((docs, context) -> { for (ChangeFeedProcessorItem item : docs) { // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here } String leaseToken = context.getLeaseToken(); // Handling of the lease token corresponding to a batch of change feed processor item goes here }) .buildChangeFeedProcessor();
-
Constructor Summary
ConstructorDescriptionInstantiates a new Cosmos a new ChangeFeedProcessor builder. -
Method Summary
Modifier and TypeMethodDescriptionBuilds a new instance of theChangeFeedProcessor
with the specified configuration.feedContainer
(CosmosAsyncContainer feedContainer) Sets and existingCosmosAsyncContainer
to be used to read from the monitored container.handleAllVersionsAndDeletesChanges
(BiConsumer<List<ChangeFeedProcessorItem>, ChangeFeedProcessorContext> biConsumer) Sets aBiConsumer
function which will be called to process changes for AllVersionsAndDeletes change feed mode.Sets a consumer function which will be called to process changes for AllVersionsAndDeletes change feed mode.handleChanges
(Consumer<List<JsonNode>> consumer) Sets a consumer function which will be called to process changes for LatestVersion change feed mode.Sets a consumer function which will be called to process changes for LatestVersion change feed mode.Sets the host name.leaseContainer
(CosmosAsyncContainer leaseContainer) Sets an existingCosmosAsyncContainer
to be used to read from the leases container.options
(ChangeFeedProcessorOptions changeFeedProcessorOptions) Sets theChangeFeedProcessorOptions
to be used.
-
Constructor Details
-
ChangeFeedProcessorBuilder
public ChangeFeedProcessorBuilder()Instantiates a new Cosmos a new ChangeFeedProcessor builder.
-
-
Method Details
-
hostName
Sets the host name.- Parameters:
hostName
- the name to be used for the host. When using multiple hosts, each host must have a unique name.- Returns:
- current Builder.
-
feedContainer
Sets and existingCosmosAsyncContainer
to be used to read from the monitored container.- Parameters:
feedContainer
- the instance ofCosmosAsyncContainer
to be used.- Returns:
- current Builder.
-
leaseContainer
Sets an existingCosmosAsyncContainer
to be used to read from the leases container.- Parameters:
leaseContainer
- the instance ofCosmosAsyncContainer
to use.- Returns:
- current Builder.
-
handleChanges
Sets a consumer function which will be called to process changes for LatestVersion change feed mode. Attention! This API is not merge proof, please usehandleLatestVersionChanges(Consumer)
instead..handleChanges(docs -> { for (JsonNode item : docs) { // Implementation for handling and processing of each JsonNode item goes here } })
- Parameters:
consumer
- theConsumer
to call for handling the feeds.- Returns:
- current Builder.
-
handleLatestVersionChanges
public ChangeFeedProcessorBuilder handleLatestVersionChanges(Consumer<List<ChangeFeedProcessorItem>> consumer) Sets a consumer function which will be called to process changes for LatestVersion change feed mode..handleLatestVersionChanges(changeFeedProcessorItems -> { for (ChangeFeedProcessorItem item : changeFeedProcessorItems) { // Implementation for handling and processing of each change feed item goes here } })
- Parameters:
consumer
- theConsumer
to call for handling the feeds.- Returns:
- current Builder.
-
handleAllVersionsAndDeletesChanges
@Beta(value=V4_37_0, warningText="Preview API - subject to change in non-backwards compatible way") public ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(Consumer<List<ChangeFeedProcessorItem>> consumer) Sets a consumer function which will be called to process changes for AllVersionsAndDeletes change feed mode..handleAllVersionsAndDeletesChanges(docs -> { for (ChangeFeedProcessorItem item : docs) { // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here } })
- Parameters:
consumer
- theConsumer
to call for handling the feeds.- Returns:
- current Builder.
-
handleAllVersionsAndDeletesChanges
@Beta(value=V4_51_0, warningText="Preview API - subject to change in non-backwards compatible way") public ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(BiConsumer<List<ChangeFeedProcessorItem>, ChangeFeedProcessorContext> biConsumer) Sets aBiConsumer
function which will be called to process changes for AllVersionsAndDeletes change feed mode..handleAllVersionsAndDeletesChanges((docs, context) -> { for (ChangeFeedProcessorItem item : docs) { // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here } String leaseToken = context.getLeaseToken(); // Handling of the lease token corresponding to a batch of change feed processor item goes here })
- Parameters:
biConsumer
- theBiConsumer
to call for handling the feeds and theChangeFeedProcessorContext
instance.- Returns:
- current Builder.
-
options
Sets theChangeFeedProcessorOptions
to be used. Unless specifically set the default values that will be used are:- maximum items per page or FeedResponse: 100
- lease renew interval: 17 seconds
- lease acquire interval: 13 seconds
- lease expiration interval: 60 seconds
- feed poll delay: 5 seconds
- maximum scale count: unlimited
- Parameters:
changeFeedProcessorOptions
- the change feed processor options to use.- Returns:
- current Builder.
-
buildChangeFeedProcessor
Builds a new instance of theChangeFeedProcessor
with the specified configuration.- Returns:
- an instance of
ChangeFeedProcessor
.
-