Class InMemoryCheckpointManager
- java.lang.Object
-
- com.microsoft.azure.eventprocessorhost.InMemoryCheckpointManager
-
- All Implemented Interfaces:
ICheckpointManager
public class InMemoryCheckpointManager extends Object implements ICheckpointManager
An ICheckpointManager implementation based on an in-memory store. THIS CLASS IS PROVIDED AS A CONVENIENCE FOR TESTING ONLY. All data stored via this class is in memory only and not persisted in any way. In addition, it is only visible within the same process: multiple instances of EventProcessorHost in the same process will share the same in-memory store and checkpoints created by one will be visible to the others, but that is not true across processes. With an ordinary store, there is a clear and distinct line between the values that are persisted and the values that are live in memory. With an in-memory store, that line gets blurry. If we accidentally hand out a reference to the in-store object, then the calling code is operating on the "persisted" values without going through the manager and behavior will be very different. Hence, the implementation takes pains to distinguish between references to "live" and "persisted" checkpoints. To use this class, create a new instance and pass it to the EventProcessorHost constructor that takes ICheckpointManager as an argument. After the EventProcessorHost instance is constructed, be sure to call initialize() on this object before starting processing with EventProcessorHost.registerEventProcessor() or EventProcessorHost.registerEventProcessorFactory().
-
-
Constructor Summary
Constructors Constructor Description InMemoryCheckpointManager()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description CompletableFuture<Boolean>
checkpointStoreExists()
Does the checkpoint store exist? The returned CompletableFuture completes with true if the checkpoint store exists or false if it does not.CompletableFuture<Void>
createAllCheckpointsIfNotExists(List<String> partitionIds)
Creates the checkpoint HOLDERs for the given partitions.CompletableFuture<Void>
createCheckpointStoreIfNotExists()
Create the checkpoint store if it doesn't exist.CompletableFuture<Void>
deleteCheckpoint(String partitionId)
Delete the stored checkpoint data for the given partition.CompletableFuture<Void>
deleteCheckpointStore()
Deletes the checkpoint store.CompletableFuture<Checkpoint>
getCheckpoint(String partitionId)
Get the checkpoint data associated with the given partition.void
initialize(com.microsoft.azure.eventprocessorhost.HostContext hostContext)
CompletableFuture<Void>
updateCheckpoint(CompleteLease lease, Checkpoint checkpoint)
Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint.
-
-
-
Method Detail
-
initialize
public void initialize(com.microsoft.azure.eventprocessorhost.HostContext hostContext)
-
checkpointStoreExists
public CompletableFuture<Boolean> checkpointStoreExists()
Description copied from interface:ICheckpointManager
Does the checkpoint store exist? The returned CompletableFuture completes with true if the checkpoint store exists or false if it does not. It completes exceptionally on error.- Specified by:
checkpointStoreExists
in interfaceICheckpointManager
- Returns:
- CompletableFuture -> true if it exists, false if not
-
createCheckpointStoreIfNotExists
public CompletableFuture<Void> createCheckpointStoreIfNotExists()
Description copied from interface:ICheckpointManager
Create the checkpoint store if it doesn't exist. Do nothing if it does exist.- Specified by:
createCheckpointStoreIfNotExists
in interfaceICheckpointManager
- Returns:
- CompletableFuture -> null on success, completes exceptionally on error.
-
deleteCheckpointStore
public CompletableFuture<Void> deleteCheckpointStore()
Description copied from interface:ICheckpointManager
Deletes the checkpoint store.- Specified by:
deleteCheckpointStore
in interfaceICheckpointManager
- Returns:
- CompletableFuture -> null on success, completes exceptionally on error.
-
getCheckpoint
public CompletableFuture<Checkpoint> getCheckpoint(String partitionId)
Description copied from interface:ICheckpointManager
Get the checkpoint data associated with the given partition. Could return null if no checkpoint has been created for that partition.- Specified by:
getCheckpoint
in interfaceICheckpointManager
- Parameters:
partitionId
- Id of partition to get checkpoint info for.- Returns:
- CompletableFuture -> checkpoint info, or null. Completes exceptionally on error.
-
createAllCheckpointsIfNotExists
public CompletableFuture<Void> createAllCheckpointsIfNotExists(List<String> partitionIds)
Description copied from interface:ICheckpointManager
Creates the checkpoint HOLDERs for the given partitions. Does nothing for any checkpoint HOLDERs that already exist. The semantics of this are complicated because it is possible to use the same store for both leases and checkpoints (the Azure Storage implementation does so) and it is required to have a lease for every partition but it is not required to have a checkpoint for a partition. It is a valid scenario to never use checkpoints at all, so it is important for the store to distinguish between creating the structure(s) that will hold a checkpoint and actually creating a checkpoint (storing an offset/sequence number pair in the structure).- Specified by:
createAllCheckpointsIfNotExists
in interfaceICheckpointManager
- Parameters:
partitionIds
- List of partitions to create checkpoint HOLDERs for.- Returns:
- CompletableFuture -> null on success, completes exceptionally on error.
-
updateCheckpoint
public CompletableFuture<Void> updateCheckpoint(CompleteLease lease, Checkpoint checkpoint)
Description copied from interface:ICheckpointManager
Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint. The lease argument is necessary to make the Azure Storage implementation work correctly: the Azure Storage implementation stores the checkpoint as part of the lease and we cannot completely hide the connection between the two. If your implementation does not have this limitation, you are free to ignore the lease argument.- Specified by:
updateCheckpoint
in interfaceICheckpointManager
- Parameters:
lease
- lease for the partition to be checkpointed.checkpoint
- offset/sequenceNumber and partition id to update the store with.- Returns:
- CompletableFuture -> null on success. Completes exceptionally on error.
-
deleteCheckpoint
public CompletableFuture<Void> deleteCheckpoint(String partitionId)
Description copied from interface:ICheckpointManager
Delete the stored checkpoint data for the given partition. If there is no stored checkpoint for the given partition, that is treated as success. Deleting the checkpoint HOLDER is allowed but not required; your implementation is free to do whichever is more convenient.- Specified by:
deleteCheckpoint
in interfaceICheckpointManager
- Parameters:
partitionId
- id of partition to delete checkpoint from store- Returns:
- CompletableFuture -> null on success. Completes exceptionally on error.
-
-