C
- the type of the continuation tokenT
- The type of elements in a ContinuablePage
P
- The ContinuablePage
holding items of type T
.public abstract class ContinuablePagedFluxCore<C,T,P extends ContinuablePage<C,T>> extends ContinuablePagedFlux<C,T,P>
ContinuablePagedFlux
.
This type is a Flux that provides the ability to operate on pages of type ContinuablePage
and individual
items in such pages. This type supports user-provided continuation tokens, allowing for restarting from a
previously-retrieved continuation token.
The type is backed by the Page Retriever provider provided in it's constructor. The provider is expected to return
PageRetriever
when called. The provider is invoked for each Subscription to this Flux. Given provider is
called per Subscription, the provider implementation can create one or more objects to store any state and Page
Retriever can capture and use those objects. This indirectly associate the state objects to the Subscription. The
Page Retriever can get called multiple times in serial fashion, each time after the completion of the Flux returned
by the previous invocation. The final completion signal will be send to the Subscriber when the last Page emitted by
the Flux returned by the Page Retriever has null
continuation token.
Extending PagedFluxCore for Custom Continuation Token support
class ContinuationState<C> { private C lastContinuationToken; private boolean isDone; ContinuationState(C token) { this.lastContinuationToken = token; } void setLastContinuationToken(C token) { this.isDone = token == null; this.lastContinuationToken = token; } C getLastContinuationToken() { return this.lastContinuationToken; } boolean isDone() { return this.isDone; } } class FileContinuationToken { private final int nextLinkId; FileContinuationToken(int nextLinkId) { this.nextLinkId = nextLinkId; } public int getNextLinkId() { return nextLinkId; } } class File { private final String guid; File(String guid) { this.guid = guid; } public String getGuid() { return guid; } } class FilePage implements ContinuablePage<FileContinuationToken, File> { private final IterableStream<File> elements; private final FileContinuationToken fileContinuationToken; FilePage(List<File> elements, FileContinuationToken fileContinuationToken) { this.elements = IterableStream.of(elements); this.fileContinuationToken = fileContinuationToken; } {@literal @}Override public IterableStream<File> getElements() { return elements; } {@literal @}Override public FileContinuationToken getContinuationToken() { return fileContinuationToken; } } class FileShareServiceClient { Flux<FilePage> getFilePages(FileContinuationToken token) { List<File> files = Collections.singletonList(new File(UUID.randomUUID().toString())); if (token.getNextLinkId() < 10) { return Flux.just(new FilePage(files, null)); } else { return Flux.just(new FilePage(files, new FileContinuationToken((int) Math.floor(Math.random() * 20)))); } } } FileShareServiceClient client = new FileShareServiceClient(); Supplier<PageRetriever<FileContinuationToken, FilePage>> pageRetrieverProvider = () -> (continuationToken, pageSize) -> client.getFilePages(continuationToken); class FilePagedFlux extends ContinuablePagedFluxCore<FileContinuationToken, File, FilePage> { FilePagedFlux(Supplier<PageRetriever<FileContinuationToken, FilePage>> pageRetrieverProvider) { super(pageRetrieverProvider); } } FilePagedFlux filePagedFlux = new FilePagedFlux(pageRetrieverProvider);
ContinuablePagedFlux
,
ContinuablePage
Modifier | Constructor and Description |
---|---|
protected |
ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider)
Creates an instance of
ContinuablePagedFluxCore . |
protected |
ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider,
int pageSize)
Creates an instance of
ContinuablePagedFluxCore . |
protected |
ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider,
Integer pageSize,
Predicate<C> continuationPredicate)
Creates an instance of
ContinuablePagedFluxCore . |
Modifier and Type | Method and Description |
---|---|
Flux<P> |
byPage()
Gets a
Flux of ContinuablePage starting at the first page. |
Flux<P> |
byPage(C continuationToken)
Gets a
Flux of ContinuablePage beginning at the page identified by the given continuation token. |
Flux<P> |
byPage(C continuationToken,
int preferredPageSize)
Gets a
Flux of ContinuablePage beginning at the page identified by the given continuation token
requesting each page to contain the number of elements equal to the preferred page size. |
Flux<P> |
byPage(int preferredPageSize)
Gets a
Flux of ContinuablePage starting at the first page requesting each page to contain a
number of elements equal to the preferred page size. |
Integer |
getPageSize()
Get the page size configured this
ContinuablePagedFluxCore . |
void |
subscribe(CoreSubscriber<? super T> coreSubscriber)
Subscribe to consume all items of type
T in the sequence respectively. |
getContinuationPredicate
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, contextWrite, contextWrite, count, create, create, defaultIfEmpty, defer, deferContextual, deferWithContext, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, firstWithSignal, firstWithSignal, firstWithValue, firstWithValue, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, getPrefetch, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, mapNotNull, materialize, merge, merge, merge, merge, merge, merge, mergeComparing, mergeComparing, mergeComparing, mergeComparingDelayError, mergeComparingWith, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, shareNext, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscriberContext, subscriberContext, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, then, then, thenEmpty, thenMany, timed, timed, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, transformDeferred, transformDeferredContextual, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider)
ContinuablePagedFluxCore
.pageRetrieverProvider
- a provider that returns PageRetriever
.NullPointerException
- If pageRetrieverProvider
is null.protected ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider, int pageSize)
ContinuablePagedFluxCore
.pageRetrieverProvider
- a provider that returns PageRetriever
.pageSize
- the preferred page sizeNullPointerException
- If pageRetrieverProvider
is null.IllegalArgumentException
- If pageSize
is less than or equal to zero.protected ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider, Integer pageSize, Predicate<C> continuationPredicate)
ContinuablePagedFluxCore
.pageRetrieverProvider
- A provider that returns PageRetriever
.pageSize
- The preferred page size.continuationPredicate
- A predicate which determines if paging should continue.NullPointerException
- If pageRetrieverProvider
is null.IllegalArgumentException
- If pageSize
is not null and is less than or equal to zero.public Integer getPageSize()
ContinuablePagedFluxCore
.null
if unspecified.public Flux<P> byPage()
ContinuablePagedFlux
Flux
of ContinuablePage
starting at the first page.byPage
in class ContinuablePagedFlux<C,T,P extends ContinuablePage<C,T>>
Flux
of ContinuablePage
.public Flux<P> byPage(C continuationToken)
ContinuablePagedFlux
Flux
of ContinuablePage
beginning at the page identified by the given continuation token.byPage
in class ContinuablePagedFlux<C,T,P extends ContinuablePage<C,T>>
continuationToken
- A continuation token identifying the page to select.Flux
of ContinuablePage
.public Flux<P> byPage(int preferredPageSize)
ContinuablePagedFlux
Flux
of ContinuablePage
starting at the first page requesting each page to contain a
number of elements equal to the preferred page size.
The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.
byPage
in class ContinuablePagedFlux<C,T,P extends ContinuablePage<C,T>>
preferredPageSize
- The preferred page size.Flux
of ContinuablePage
.public Flux<P> byPage(C continuationToken, int preferredPageSize)
ContinuablePagedFlux
Flux
of ContinuablePage
beginning at the page identified by the given continuation token
requesting each page to contain the number of elements equal to the preferred page size.
The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.
byPage
in class ContinuablePagedFlux<C,T,P extends ContinuablePage<C,T>>
continuationToken
- A continuation token identifying the page to select.preferredPageSize
- The preferred page size.Flux
of ContinuablePage
.public void subscribe(CoreSubscriber<? super T> coreSubscriber)
T
in the sequence respectively. This is recommended for most
common scenarios. This will seamlessly fetch next page when required and provide with a Flux
of items.subscribe
in interface CorePublisher<T>
subscribe
in class Flux<T>
coreSubscriber
- The subscriber for this ContinuablePagedFluxCore
Copyright © 2021 Microsoft Corporation. All rights reserved.