Package com.azure.core.util.polling
Class PollerFlux<T,U>
- Type Parameters:
T
- The type of poll response value.U
- The type of the final result of long running operation.
- All Implemented Interfaces:
org.reactivestreams.Publisher<AsyncPollResponse<T,
,U>> CorePublisher<AsyncPollResponse<T,
U>>
A Flux that simplifies the task of executing long running operations against an Azure service.
A subscription to
PollerFlux
initiates a long running operation and polls the status
until it completes.
Code samples
Instantiating and subscribing to PollerFlux
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMillis(800)); // Create poller instance PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100), (context) -> Mono.empty(), // Define your custom poll operation (context) -> { if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) { System.out.println("Returning intermediate response."); return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS, "Operation in progress.")); } else { System.out.println("Returning final response."); return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED, "Operation completed.")); } }, (activationResponse, context) -> Mono.error(new RuntimeException("Cancellation is not supported")), (context) -> Mono.just("Final Output")); // Listen to poll responses poller.subscribe(response -> { // Process poll response System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue()); }); // Do something else
Asynchronously wait for polling to complete and then retrieve the final result
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMinutes(5)); // Create poller instance PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100), (context) -> Mono.empty(), (context) -> { if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) { System.out.println("Returning intermediate response."); return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS, "Operation in progress.")); } else { System.out.println("Returning final response."); return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED, "Operation completed.")); } }, (activationResponse, context) -> Mono.just("FromServer:OperationIsCancelled"), (context) -> Mono.just("FromServer:FinalOutput")); poller.take(Duration.ofMinutes(30)) .last() .flatMap(asyncPollResponse -> { if (asyncPollResponse.getStatus() == LongRunningOperationStatus.SUCCESSFULLY_COMPLETED) { // operation completed successfully, retrieving final result. return asyncPollResponse .getFinalResult(); } else { return Mono.error(new RuntimeException("polling completed unsuccessfully with status:" + asyncPollResponse.getStatus())); } }).block();
Block for polling to complete and then retrieve the final result
AsyncPollResponse<String, String> terminalResponse = pollerFlux.blockLast(); System.out.printf("Polling complete. Final Status: %s", terminalResponse.getStatus()); if (terminalResponse.getStatus() == LongRunningOperationStatus.SUCCESSFULLY_COMPLETED) { String finalResult = terminalResponse.getFinalResult().block(); System.out.printf("Polling complete. Final Status: %s", finalResult); }
Asynchronously poll until poller receives matching status
final Predicate<AsyncPollResponse<String, String>> isComplete = response -> { return response.getStatus() != LongRunningOperationStatus.IN_PROGRESS && response.getStatus() != LongRunningOperationStatus.NOT_STARTED; }; pollerFlux .takeUntil(isComplete) .subscribe(completed -> { System.out.println("Completed poll response, status: " + completed.getStatus()); });
Asynchronously cancel the long running operation
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMinutes(5)); // Create poller instance PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100), (context) -> Mono.empty(), (context) -> { if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) { System.out.println("Returning intermediate response."); return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS, "Operation in progress.")); } else { System.out.println("Returning final response."); return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED, "Operation completed.")); } }, (activationResponse, context) -> Mono.just("FromServer:OperationIsCancelled"), (context) -> Mono.just("FromServer:FinalOutput")); // Asynchronously wait 30 minutes to complete the polling, if not completed // within in the time then cancel the server operation. poller.take(Duration.ofMinutes(30)) .last() .flatMap(asyncPollResponse -> { if (!asyncPollResponse.getStatus().isComplete()) { return asyncPollResponse .cancelOperation() .then(Mono.error(new RuntimeException("Operation is cancelled!"))); } else { return Mono.just(asyncPollResponse); } }).block();
Instantiating and subscribing to PollerFlux from a known polling strategy
// Create poller instance PollerFlux<BinaryData, String> poller = PollerFlux.create( Duration.ofMillis(100), // pass in your custom activation operation () -> Mono.just(new SimpleResponse<Void>(new HttpRequest( HttpMethod.POST, "http://httpbin.org"), 202, new HttpHeaders().set("Operation-Location", "http://httpbin.org"), null)), new OperationResourcePollingStrategy<>(new HttpPipelineBuilder().build()), TypeReference.createInstance(BinaryData.class), TypeReference.createInstance(String.class)); // Listen to poll responses poller.subscribe(response -> { // Process poll response System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue()); }); // Do something else
Instantiating and subscribing to PollerFlux from a custom polling strategy
// Create custom polling strategy based on OperationResourcePollingStrategy PollingStrategy<BinaryData, String> strategy = new OperationResourcePollingStrategy<BinaryData, String>( new HttpPipelineBuilder().build()) { // override any interface method to customize the polling behavior @Override public Mono<PollResponse<BinaryData>> poll(PollingContext<BinaryData> context, TypeReference<BinaryData> pollResponseType) { return Mono.just(new PollResponse<>( LongRunningOperationStatus.SUCCESSFULLY_COMPLETED, BinaryData.fromString(""))); } }; // Create poller instance PollerFlux<BinaryData, String> poller = PollerFlux.create( Duration.ofMillis(100), // pass in your custom activation operation () -> Mono.just(new SimpleResponse<Void>(new HttpRequest( HttpMethod.POST, "http://httpbin.org"), 202, new HttpHeaders().set("Operation-Location", "http://httpbin.org"), null)), strategy, TypeReference.createInstance(BinaryData.class), TypeReference.createInstance(String.class)); // Listen to poll responses poller.subscribe(response -> { // Process poll response System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue()); }); // Do something else
-
Constructor Summary
ConstructorDescriptionPollerFlux
(Duration pollInterval, Function<PollingContext<T>, Mono<T>> activationOperation, Function<PollingContext<T>, Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancelOperation, Function<PollingContext<T>, Mono<U>> fetchResultOperation) Creates PollerFlux. -
Method Summary
Modifier and TypeMethodDescriptionstatic <T,
U> PollerFlux<T, U> create
(Duration pollInterval, Function<PollingContext<T>, Mono<PollResponse<T>>> activationOperation, Function<PollingContext<T>, Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancelOperation, Function<PollingContext<T>, Mono<U>> fetchResultOperation) Creates PollerFlux.static <T,
U> PollerFlux<T, U> create
(Duration pollInterval, Supplier<Mono<? extends Response<?>>> initialOperation, PollingStrategy<T, U> strategy, TypeReference<T> pollResponseType, TypeReference<U> resultType) Creates PollerFlux.static <T,
U> PollerFlux<T, U> Creates a PollerFlux instance that returns an error on subscription.Returns the current polling duration for thisPollerFlux
instance.SyncPoller<T,
U> Gets a synchronous blocking poller.PollerFlux<T,
U> setPollInterval
(Duration pollInterval) Sets the poll interval for this poller.void
subscribe
(CoreSubscriber<? super AsyncPollResponse<T, U>> actual) Methods inherited from class reactor.core.publisher.Flux
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, contextWrite, contextWrite, count, create, create, defaultIfEmpty, defer, deferContextual, deferWithContext, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, firstWithSignal, firstWithSignal, firstWithValue, firstWithValue, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, getPrefetch, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, mapNotNull, materialize, merge, merge, merge, merge, merge, merge, mergeComparing, mergeComparing, mergeComparing, mergeComparingDelayError, mergeComparingWith, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergePriority, mergePriority, mergePriority, mergePriorityDelayError, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorComplete, onErrorComplete, onErrorComplete, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, shareNext, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscriberContext, subscriberContext, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, then, then, thenEmpty, thenMany, timed, timed, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, transformDeferred, transformDeferredContextual, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
-
Constructor Details
-
PollerFlux
public PollerFlux(Duration pollInterval, Function<PollingContext<T>, Mono<T>> activationOperation, Function<PollingContext<T>, Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancelOperation, Function<PollingContext<T>, Mono<U>> fetchResultOperation) Creates PollerFlux.- Parameters:
pollInterval
- the polling intervalactivationOperation
- the activation operation to activate (start) the long running operation. This operation will be invoked at most once across all subscriptions. This parameter is required. If there is no specific activation work to be done then invocation should return Mono.empty(), this operation will be called with a newPollingContext
.pollOperation
- the operation to poll the current state of long running operation. This parameter is required and the operation will be called with currentPollingContext
.cancelOperation
- aFunction
that represents the operation to cancel the long running operation if service supports cancellation. This parameter is required. If service does not support cancellation then the implementer should return Mono.error with an error message indicating absence of cancellation support. The operation will be called with currentPollingContext
.fetchResultOperation
- aFunction
that represents the operation to retrieve final result of the long running operation if service support it. This parameter is required and operation will be called with the currentPollingContext
. If service does not have an api to fetch final result and if final result is same as final poll response value then implementer can choose to simply return value from provided final poll response.
-
-
Method Details
-
create
public static <T,U> PollerFlux<T,U> create(Duration pollInterval, Function<PollingContext<T>, Mono<PollResponse<T>>> activationOperation, Function<PollingContext<T>, Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancelOperation, Function<PollingContext<T>, Mono<U>> fetchResultOperation) Creates PollerFlux. This create method differs from the PollerFlux constructor in that the constructor uses an activationOperation which returns a Mono that emits result, the create method uses an activationOperation which returns a Mono that emitsPollResponse
. ThePollResponse
holds the result. If thePollResponse
from the activationOperation indicate that long running operation is completed then the pollOperation will not be called.- Type Parameters:
T
- The type of poll response value.U
- The type of the final result of long running operation.- Parameters:
pollInterval
- the polling intervalactivationOperation
- the activation operation to activate (start) the long running operation. This operation will be invoked at most once across all subscriptions. This parameter is required. If there is no specific activation work to be done then invocation should return Mono.empty(), this operation will be called with a newPollingContext
.pollOperation
- the operation to poll the current state of long running operation. This parameter is required and the operation will be called with currentPollingContext
.cancelOperation
- aFunction
that represents the operation to cancel the long running operation if service supports cancellation. This parameter is required. If service does not support cancellation then the implementer should return Mono.error with an error message indicating absence of cancellation support. The operation will be called with currentPollingContext
.fetchResultOperation
- aFunction
that represents the operation to retrieve final result of the long running operation if service support it. This parameter is required and operation will be called currentPollingContext
. If service does not have an api to fetch final result and if final result is same as final poll response value then implementer can choose to simply return value from provided final poll response.- Returns:
- PollerFlux
-
create
public static <T,U> PollerFlux<T,U> create(Duration pollInterval, Supplier<Mono<? extends Response<?>>> initialOperation, PollingStrategy<T, U> strategy, TypeReference<T> pollResponseType, TypeReference<U> resultType) Creates PollerFlux. This create method uses aPollingStrategy
to poll the status of a long running operation after the activation operation is invoked. SeePollingStrategy
for more details of known polling strategies and how to create a custom strategy.- Type Parameters:
T
- The type of poll response value.U
- The type of the final result of long running operation.- Parameters:
pollInterval
- the polling intervalinitialOperation
- the activation operation to activate (start) the long running operation. This operation will be invoked at most once across all subscriptions. This parameter is required. If there is no specific activation work to be done then invocation should return Mono.empty(), this operation will be called with a newPollingContext
.strategy
- a known strategy for polling a long running operation in AzurepollResponseType
- theTypeReference
of the response type from a polling call, or BinaryData if raw response body should be kept. This should match the generic parameterPollerFlux
.resultType
- theTypeReference
of the final result object to deserialize into, or BinaryData if raw response body should be kept. This should match the generic parameterPollerFlux
.- Returns:
- PollerFlux
-
error
Creates a PollerFlux instance that returns an error on subscription.- Type Parameters:
T
- The type of poll response value.U
- The type of the final result of long running operation.- Parameters:
ex
- The exception to be returned on subscription of thisPollerFlux
.- Returns:
- A poller flux instance that returns an error without emitting any data.
- See Also:
-
setPollInterval
Sets the poll interval for this poller. The new interval will be used for all subsequent polling operations including the subscriptions that are already in progress.- Parameters:
pollInterval
- The new poll interval for this poller.- Returns:
- The updated instance of
PollerFlux
. - Throws:
NullPointerException
- if thepollInterval
is null.IllegalArgumentException
- if thepollInterval
is zero or negative.
-
getPollInterval
Returns the current polling duration for thisPollerFlux
instance.- Returns:
- The current polling duration.
-
subscribe
- Specified by:
subscribe
in interfaceCorePublisher<T>
- Specified by:
subscribe
in classFlux<AsyncPollResponse<T,
U>>
-
getSyncPoller
Gets a synchronous blocking poller.- Returns:
- a synchronous blocking poller.
-