Package com.azure.core.util.paging
Class ContinuablePagedFluxCore<C,T,P extends ContinuablePage<C,T>>
- java.lang.Object
-
- reactor.core.publisher.Flux<T>
-
- com.azure.core.util.paging.ContinuablePagedFlux<C,T,P>
-
- com.azure.core.util.paging.ContinuablePagedFluxCore<C,T,P>
-
- Type Parameters:
C
- the type of the continuation tokenT
- The type of elements in aContinuablePage
P
- TheContinuablePage
holding items of typeT
.
- All Implemented Interfaces:
org.reactivestreams.Publisher<T>
,CorePublisher<T>
- Direct Known Subclasses:
PagedFluxBase
public abstract class ContinuablePagedFluxCore<C,T,P extends ContinuablePage<C,T>> extends ContinuablePagedFlux<C,T,P>
The default implementation ofContinuablePagedFlux
. This type is a Flux that provides the ability to operate on pages of typeContinuablePage
and individual items in such pages. This type supports user-provided continuation tokens, allowing for restarting from a previously-retrieved continuation token. The type is backed by the Page Retriever provider provided in it's constructor. The provider is expected to returnPageRetriever
when called. The provider is invoked for each Subscription to this Flux. Given provider is called per Subscription, the provider implementation can create one or more objects to store any state and Page Retriever can capture and use those objects. This indirectly associate the state objects to the Subscription. The Page Retriever can get called multiple times in serial fashion, each time after the completion of the Flux returned by the previous invocation. The final completion signal will be send to the Subscriber when the last Page emitted by the Flux returned by the Page Retriever hasnull
continuation token.Extending PagedFluxCore for Custom Continuation Token support
class ContinuationState<C> { private C lastContinuationToken; private boolean isDone; ContinuationState(C token) { this.lastContinuationToken = token; } void setLastContinuationToken(C token) { this.isDone = token == null; this.lastContinuationToken = token; } C getLastContinuationToken() { return this.lastContinuationToken; } boolean isDone() { return this.isDone; } } class FileContinuationToken { private final int nextLinkId; FileContinuationToken(int nextLinkId) { this.nextLinkId = nextLinkId; } public int getNextLinkId() { return nextLinkId; } } class File { private final String guid; File(String guid) { this.guid = guid; } public String getGuid() { return guid; } } class FilePage implements ContinuablePage<FileContinuationToken, File> { private final IterableStream<File> elements; private final FileContinuationToken fileContinuationToken; FilePage(List<File> elements, FileContinuationToken fileContinuationToken) { this.elements = IterableStream.of(elements); this.fileContinuationToken = fileContinuationToken; } @Override public IterableStream<File> getElements() { return elements; } @Override public FileContinuationToken getContinuationToken() { return fileContinuationToken; } } class FileShareServiceClient { Flux<FilePage> getFilePages(FileContinuationToken token) { List<File> files = Collections.singletonList(new File(UUID.randomUUID().toString())); if (token.getNextLinkId() < 10) { return Flux.just(new FilePage(files, null)); } else { return Flux.just(new FilePage(files, new FileContinuationToken((int) Math.floor(Math.random() * 20)))); } } } FileShareServiceClient client = new FileShareServiceClient(); Supplier<PageRetriever<FileContinuationToken, FilePage>> pageRetrieverProvider = () -> (continuationToken, pageSize) -> client.getFilePages(continuationToken); class FilePagedFlux extends ContinuablePagedFluxCore<FileContinuationToken, File, FilePage> { FilePagedFlux(Supplier<PageRetriever<FileContinuationToken, FilePage>> pageRetrieverProvider) { super(pageRetrieverProvider); } } FilePagedFlux filePagedFlux = new FilePagedFlux(pageRetrieverProvider);
- See Also:
ContinuablePagedFlux
,ContinuablePage
-
-
Constructor Summary
Constructors Modifier Constructor Description protected
ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider)
Creates an instance ofContinuablePagedFluxCore
.protected
ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider, int pageSize)
Creates an instance ofContinuablePagedFluxCore
.protected
ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider, Integer pageSize, Predicate<C> continuationPredicate)
Creates an instance ofContinuablePagedFluxCore
.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description Flux<P>
byPage()
Gets aFlux
ofContinuablePage
starting at the first page.Flux<P>
byPage(int preferredPageSize)
Gets aFlux
ofContinuablePage
starting at the first page requesting each page to contain a number of elements equal to the preferred page size.Flux<P>
byPage(C continuationToken)
Gets aFlux
ofContinuablePage
beginning at the page identified by the given continuation token.Flux<P>
byPage(C continuationToken, int preferredPageSize)
Gets aFlux
ofContinuablePage
beginning at the page identified by the given continuation token requesting each page to contain the number of elements equal to the preferred page size.Integer
getPageSize()
Get the page size configured thisContinuablePagedFluxCore
.void
subscribe(CoreSubscriber<? super T> coreSubscriber)
Subscribe to consume all items of typeT
in the sequence respectively.-
Methods inherited from class com.azure.core.util.paging.ContinuablePagedFlux
getContinuationPredicate
-
Methods inherited from class reactor.core.publisher.Flux
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, contextWrite, contextWrite, count, create, create, defaultIfEmpty, defer, deferContextual, deferWithContext, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, firstWithSignal, firstWithSignal, firstWithValue, firstWithValue, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, getPrefetch, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, mapNotNull, materialize, merge, merge, merge, merge, merge, merge, mergeComparing, mergeComparing, mergeComparing, mergeComparingDelayError, mergeComparingWith, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, shareNext, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscriberContext, subscriberContext, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, then, then, thenEmpty, thenMany, timed, timed, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, transformDeferred, transformDeferredContextual, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
-
-
-
-
Constructor Detail
-
ContinuablePagedFluxCore
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider)
Creates an instance ofContinuablePagedFluxCore
.- Parameters:
pageRetrieverProvider
- a provider that returnsPageRetriever
.- Throws:
NullPointerException
- IfpageRetrieverProvider
is null.
-
ContinuablePagedFluxCore
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider, int pageSize)
Creates an instance ofContinuablePagedFluxCore
.- Parameters:
pageRetrieverProvider
- a provider that returnsPageRetriever
.pageSize
- the preferred page size- Throws:
NullPointerException
- IfpageRetrieverProvider
is null.IllegalArgumentException
- IfpageSize
is less than or equal to zero.
-
ContinuablePagedFluxCore
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider, Integer pageSize, Predicate<C> continuationPredicate)
Creates an instance ofContinuablePagedFluxCore
.- Parameters:
pageRetrieverProvider
- A provider that returnsPageRetriever
.pageSize
- The preferred page size.continuationPredicate
- A predicate which determines if paging should continue.- Throws:
NullPointerException
- IfpageRetrieverProvider
is null.IllegalArgumentException
- IfpageSize
is not null and is less than or equal to zero.
-
-
Method Detail
-
getPageSize
public Integer getPageSize()
Get the page size configured thisContinuablePagedFluxCore
.- Returns:
- the page size configured,
null
if unspecified.
-
byPage
public Flux<P> byPage()
Description copied from class:ContinuablePagedFlux
Gets aFlux
ofContinuablePage
starting at the first page.- Specified by:
byPage
in classContinuablePagedFlux<C,T,P extends ContinuablePage<C,T>>
- Returns:
- A
Flux
ofContinuablePage
.
-
byPage
public Flux<P> byPage(C continuationToken)
Description copied from class:ContinuablePagedFlux
Gets aFlux
ofContinuablePage
beginning at the page identified by the given continuation token.- Specified by:
byPage
in classContinuablePagedFlux<C,T,P extends ContinuablePage<C,T>>
- Parameters:
continuationToken
- A continuation token identifying the page to select.- Returns:
- A
Flux
ofContinuablePage
.
-
byPage
public Flux<P> byPage(int preferredPageSize)
Description copied from class:ContinuablePagedFlux
Gets aFlux
ofContinuablePage
starting at the first page requesting each page to contain a number of elements equal to the preferred page size.The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.
- Specified by:
byPage
in classContinuablePagedFlux<C,T,P extends ContinuablePage<C,T>>
- Parameters:
preferredPageSize
- The preferred page size.- Returns:
- A
Flux
ofContinuablePage
.
-
byPage
public Flux<P> byPage(C continuationToken, int preferredPageSize)
Description copied from class:ContinuablePagedFlux
Gets aFlux
ofContinuablePage
beginning at the page identified by the given continuation token requesting each page to contain the number of elements equal to the preferred page size.The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.
- Specified by:
byPage
in classContinuablePagedFlux<C,T,P extends ContinuablePage<C,T>>
- Parameters:
continuationToken
- A continuation token identifying the page to select.preferredPageSize
- The preferred page size.- Returns:
- A
Flux
ofContinuablePage
.
-
subscribe
public void subscribe(CoreSubscriber<? super T> coreSubscriber)
Subscribe to consume all items of typeT
in the sequence respectively. This is recommended for most common scenarios. This will seamlessly fetch next page when required and provide with aFlux
of items.- Specified by:
subscribe
in interfaceCorePublisher<C>
- Specified by:
subscribe
in classFlux<T>
- Parameters:
coreSubscriber
- The subscriber for thisContinuablePagedFluxCore
-
-