Package com.azure.core.http.rest
Class PagedFlux<T>
- java.lang.Object
-
- reactor.core.publisher.Flux<T>
-
- com.azure.core.util.paging.ContinuablePagedFlux<C,T,P>
-
- com.azure.core.util.paging.ContinuablePagedFluxCore<String,T,P>
-
- com.azure.core.http.rest.PagedFluxBase<T,PagedResponse<T>>
-
- com.azure.core.http.rest.PagedFlux<T>
-
- Type Parameters:
T
- The type of items in aPagedResponse
- All Implemented Interfaces:
org.reactivestreams.Publisher<T>
,CorePublisher<T>
public class PagedFlux<T> extends PagedFluxBase<T,PagedResponse<T>>
PagedFlux is a Flux that provides the ability to operate on paginated REST responses of typePagedResponse
and individual items in such pages. When processing the response by page each response will contain the items in the page as well as the REST response details such as status code and headers.To process one item at a time, simply subscribe to this flux as shown below
Code sample
// Subscribe to process one item at a time pagedFlux .log() .subscribe(item -> System.out.println("Processing item with value: " + item), error -> System.err.println("An error occurred: " + error), () -> System.out.println("Processing complete."));
To process one page at a time, use
PagedFluxBase.byPage()
method as shown belowCode sample
// Subscribe to process one page at a time from the beginning pagedFlux .byPage() .log() .subscribe(page -> System.out.printf("Processing page containing item values: %s%n", page.getElements().stream().map(String::valueOf).collect(Collectors.joining(", "))), error -> System.err.println("An error occurred: " + error), () -> System.out.println("Processing complete."));
To process items one page at a time starting from any page associated with a continuation token, use
PagedFluxBase.byPage(String)
as shown belowCode sample
// Subscribe to process one page at a time starting from a page associated with // a continuation token String continuationToken = getContinuationToken(); pagedFlux .byPage(continuationToken) .log() .doOnSubscribe(ignored -> System.out.println( "Subscribed to paged flux processing pages starting from: " + continuationToken)) .subscribe(page -> System.out.printf("Processing page containing item values: %s%n", page.getElements().stream().map(String::valueOf).collect(Collectors.joining(", "))), error -> System.err.println("An error occurred: " + error), () -> System.out.println("Processing complete."));
- See Also:
PagedResponse
,Page
,Flux
-
-
Constructor Summary
Constructors Constructor Description PagedFlux(Function<Integer,Mono<PagedResponse<T>>> firstPageRetriever)
Creates an instance ofPagedFlux
that consists of only a single page with a given element count.PagedFlux(Function<Integer,Mono<PagedResponse<T>>> firstPageRetriever, BiFunction<String,Integer,Mono<PagedResponse<T>>> nextPageRetriever)
Creates an instance ofPagedFlux
that is capable of retrieving multiple pages with of a given page size.PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever)
Creates an instance ofPagedFlux
that consists of only a single page.PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever, Function<String,Mono<PagedResponse<T>>> nextPageRetriever)
Creates an instance ofPagedFlux
.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description static <T> PagedFlux<T>
create(Supplier<PageRetriever<String,PagedResponse<T>>> provider)
Creates an instance ofPagedFlux
backed by a Page Retriever Supplier (provider).<S> PagedFlux<S>
mapPage(Function<T,S> mapper)
Deprecated.refer the decoration samples forcreate(Supplier)
.-
Methods inherited from class com.azure.core.http.rest.PagedFluxBase
byPage, byPage, subscribe
-
Methods inherited from class com.azure.core.util.paging.ContinuablePagedFluxCore
byPage, byPage, getPageSize
-
Methods inherited from class com.azure.core.util.paging.ContinuablePagedFlux
getContinuationPredicate
-
Methods inherited from class reactor.core.publisher.Flux
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, contextWrite, contextWrite, count, create, create, defaultIfEmpty, defer, deferContextual, deferWithContext, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, firstWithSignal, firstWithSignal, firstWithValue, firstWithValue, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, getPrefetch, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, mapNotNull, materialize, merge, merge, merge, merge, merge, merge, mergeComparing, mergeComparing, mergeComparing, mergeComparingDelayError, mergeComparingWith, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, shareNext, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscriberContext, subscriberContext, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, then, then, thenEmpty, thenMany, timed, timed, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, transformDeferred, transformDeferredContextual, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
-
-
-
-
Constructor Detail
-
PagedFlux
public PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever)
Creates an instance ofPagedFlux
that consists of only a single page. This constructor takes aSupplier
that return the single page ofT
.Code sample
// A supplier that fetches the first page of data from source/service Supplier<Mono<PagedResponse<Integer>>> firstPageRetrieverFunction = () -> getFirstPage(); PagedFlux<Integer> pagedFluxInstance = new PagedFlux<>(firstPageRetrieverFunction, nextPageRetriever);
- Parameters:
firstPageRetriever
- Supplier that retrieves the first page.
-
PagedFlux
public PagedFlux(Function<Integer,Mono<PagedResponse<T>>> firstPageRetriever)
Creates an instance ofPagedFlux
that consists of only a single page with a given element count.Code sample
// A function that fetches the single page of data from a source/service. Function<Integer, Mono<PagedResponse<Integer>>> singlePageRetriever = pageSize -> getFirstPageWithSize(pageSize); PagedFlux<Integer> singlePageFluxWithPageSize = new PagedFlux<Integer>(singlePageRetriever);
- Parameters:
firstPageRetriever
- Function that retrieves the first page.
-
PagedFlux
public PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever, Function<String,Mono<PagedResponse<T>>> nextPageRetriever)
Creates an instance ofPagedFlux
. The constructor takes aSupplier
andFunction
. TheSupplier
returns the first page ofT
, theFunction
retrieves subsequent pages ofT
.Code sample
// A supplier that fetches the first page of data from source/service Supplier<Mono<PagedResponse<Integer>>> firstPageRetriever = () -> getFirstPage(); // A function that fetches subsequent pages of data from source/service given a continuation token Function<String, Mono<PagedResponse<Integer>>> nextPageRetriever = continuationToken -> getNextPage(continuationToken); PagedFlux<Integer> pagedFlux = new PagedFlux<>(firstPageRetriever, nextPageRetriever);
- Parameters:
firstPageRetriever
- Supplier that retrieves the first pagenextPageRetriever
- Function that retrieves the next page given a continuation token
-
PagedFlux
public PagedFlux(Function<Integer,Mono<PagedResponse<T>>> firstPageRetriever, BiFunction<String,Integer,Mono<PagedResponse<T>>> nextPageRetriever)
Creates an instance ofPagedFlux
that is capable of retrieving multiple pages with of a given page size.Code sample
// A function that fetches the first page of data from a source/service. Function<Integer, Mono<PagedResponse<Integer>>> firstPageRetriever = pageSize -> getFirstPageWithSize(pageSize); // A function that fetches subsequent pages of data from a source/service given a continuation token. BiFunction<String, Integer, Mono<PagedResponse<Integer>>> nextPageRetriever = (continuationToken, pageSize) -> getNextPageWithSize(continuationToken, pageSize); PagedFlux<Integer> pagedFluxWithPageSize = new PagedFlux<>(firstPageRetriever, nextPageRetriever);
- Parameters:
firstPageRetriever
- Function that retrieves the first page.nextPageRetriever
- BiFunction that retrieves the next page given a continuation token and page size.
-
-
Method Detail
-
create
public static <T> PagedFlux<T> create(Supplier<PageRetriever<String,PagedResponse<T>>> provider)
Creates an instance ofPagedFlux
backed by a Page Retriever Supplier (provider). When invoked provider should returnPageRetriever
. The provider will be called for each Subscription to the PagedFlux instance. The Page Retriever can get called multiple times in serial fashion, each time after the completion of the Flux returned from the previous invocation. The final completion signal will be send to the Subscriber when the last Page emitted by the Flux returned by Page Retriever hasnull
continuation token. The provider is useful mainly in two scenarios:- To manage state across multiple call to Page Retrieval within the same Subscription.
- To decorate a PagedFlux to produce new PagedFlux.
Decoration sample
// Transform a PagedFlux with Integer items to PagedFlux of String items. final PagedFlux<Integer> intPagedFlux = createAnInstance(); // PagedResponse<Integer> to PagedResponse<String> mapper final Function<PagedResponse<Integer>, PagedResponse<String>> responseMapper = intResponse -> new PagedResponseBase<Void, String>(intResponse.getRequest(), intResponse.getStatusCode(), intResponse.getHeaders(), intResponse.getValue() .stream() .map(intValue -> Integer.toString(intValue)).collect(Collectors.toList()), intResponse.getContinuationToken(), null); final Supplier<PageRetriever<String, PagedResponse<String>>> provider = () -> (continuationToken, pageSize) -> { Flux<PagedResponse<Integer>> flux = (continuationToken == null) ? intPagedFlux.byPage() : intPagedFlux.byPage(continuationToken); return flux.map(responseMapper); }; PagedFlux<String> strPagedFlux = PagedFlux.create(provider); // Create a PagedFlux from a PagedFlux with all exceptions mapped to a specific exception. final PagedFlux<Integer> pagedFlux = createAnInstance(); final Supplier<PageRetriever<String, PagedResponse<Integer>>> eprovider = () -> (continuationToken, pageSize) -> { Flux<PagedResponse<Integer>> flux = (continuationToken == null) ? pagedFlux.byPage() : pagedFlux.byPage(continuationToken); return flux.onErrorMap(PaginationException::new); }; final PagedFlux<Integer> exceptionMappedPagedFlux = PagedFlux.create(eprovider);
- Type Parameters:
T
- The type of items in aPagedResponse
- Parameters:
provider
- the Page Retrieval Provider- Returns:
- PagedFlux backed by the Page Retriever Function Supplier
-
mapPage
@Deprecated public <S> PagedFlux<S> mapPage(Function<T,S> mapper)
Deprecated.refer the decoration samples forcreate(Supplier)
.Maps this PagedFlux instance of T to a PagedFlux instance of type S as per the provided mapper function.- Type Parameters:
S
- The mapped type.- Parameters:
mapper
- The mapper function to convert from type T to type S.- Returns:
- A PagedFlux of type S.
-
-