Class FluxUtil
- java.lang.Object
-
- com.azure.core.util.FluxUtil
-
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static byte[]
byteBufferToArray(ByteBuffer byteBuffer)
Gets the content of the provided ByteBuffer as a byte array.static Mono<byte[]>
collectBytesFromNetworkResponse(Flux<ByteBuffer> stream, HttpHeaders headers)
Collects ByteBuffers returned in a network response into a byte array.static Mono<byte[]>
collectBytesInByteBufferStream(Flux<ByteBuffer> stream)
Collects ByteBuffers emitted by a Flux into a byte array.static Mono<byte[]>
collectBytesInByteBufferStream(Flux<ByteBuffer> stream, int sizeHint)
Collects ByteBuffers emitted by a Flux into a byte array.static Flux<ByteBuffer>
createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable,Long,Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries)
Creates aFlux
that is capable of resuming a download by applying retry logic when an error occurs.static Flux<ByteBuffer>
createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable,Long,Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries, long position)
Creates aFlux
that is capable of resuming a download by applying retry logic when an error occurs.static <T> Flux<T>
fluxContext(Function<Context,Flux<T>> serviceCall)
This method converts the incomingdeferContextual
fromReactor Context
toAzure Context
and calls the given lambda function with this context and returns a collection of typeT
static <T> Flux<T>
fluxError(ClientLogger logger, RuntimeException ex)
Propagates aRuntimeException
through the error channel ofFlux
.static boolean
isFluxByteBuffer(Type entityType)
Checks if a type is Flux<ByteBuffer>.static <T> Mono<T>
monoError(ClientLogger logger, RuntimeException ex)
Propagates aRuntimeException
through the error channel ofMono
.static <T> PagedFlux<T>
pagedFluxError(ClientLogger logger, RuntimeException ex)
Propagates aRuntimeException
through the error channel ofPagedFlux
.static Flux<ByteBuffer>
readFile(AsynchronousFileChannel fileChannel)
Creates aFlux
from anAsynchronousFileChannel
which reads the entire file.static Flux<ByteBuffer>
readFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length)
Creates aFlux
from anAsynchronousFileChannel
which reads part of a file into chunks of the given size.static Flux<ByteBuffer>
readFile(AsynchronousFileChannel fileChannel, long offset, long length)
Creates aFlux
from anAsynchronousFileChannel
which reads part of a file.static Flux<ByteBuffer>
toFluxByteBuffer(InputStream inputStream)
static Flux<ByteBuffer>
toFluxByteBuffer(InputStream inputStream, int chunkSize)
static <T> Mono<T>
toMono(Response<T> response)
Converts the incoming content to Mono.static Context
toReactorContext(Context context)
Converts an Azure context to Reactor context.static <T> Mono<T>
withContext(Function<Context,Mono<T>> serviceCall)
This method converts the incomingdeferContextual
fromReactor Context
toAzure Context
and calls the given lambda function with this context and returns a single entity of typeT
static <T> Mono<T>
withContext(Function<Context,Mono<T>> serviceCall, Map<String,String> contextAttributes)
This method converts the incomingdeferContextual
fromReactor Context
toAzure Context
, adds the specified context attributes and calls the given lambda function with this context and returns a single entity of typeT
static Mono<Void>
writeFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile)
static Mono<Void>
writeFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile, long position)
Writes theByteBuffers
emitted by aFlux
ofByteBuffer
to anAsynchronousFileChannel
starting at the givenposition
in the file.
-
-
-
Method Detail
-
isFluxByteBuffer
public static boolean isFluxByteBuffer(Type entityType)
Checks if a type is Flux<ByteBuffer>.- Parameters:
entityType
- the type to check- Returns:
- whether the type represents a Flux that emits ByteBuffer
-
collectBytesInByteBufferStream
public static Mono<byte[]> collectBytesInByteBufferStream(Flux<ByteBuffer> stream)
Collects ByteBuffers emitted by a Flux into a byte array.- Parameters:
stream
- A stream which emits ByteBuffer instances.- Returns:
- A Mono which emits the concatenation of all the ByteBuffer instances given by the source Flux.
- Throws:
IllegalStateException
- If the combined size of the emitted ByteBuffers is greater thanInteger.MAX_VALUE
.
-
collectBytesInByteBufferStream
public static Mono<byte[]> collectBytesInByteBufferStream(Flux<ByteBuffer> stream, int sizeHint)
Collects ByteBuffers emitted by a Flux into a byte array.Unlike
collectBytesInByteBufferStream(Flux)
, this method accepts a second parametersizeHint
. This size hint allows for optimizations when creating the initial buffer to reduce the number of times it needs to be resized while concatenating emitted ByteBuffers.- Parameters:
stream
- A stream which emits ByteBuffer instances.sizeHint
- A hint about the expected stream size.- Returns:
- A Mono which emits the concatenation of all the ByteBuffer instances given by the source Flux.
- Throws:
IllegalArgumentException
- IfsizeHint
is equal to or less than0
.IllegalStateException
- If the combined size of the emitted ByteBuffers is greater thanInteger.MAX_VALUE
.
-
collectBytesFromNetworkResponse
public static Mono<byte[]> collectBytesFromNetworkResponse(Flux<ByteBuffer> stream, HttpHeaders headers)
Collects ByteBuffers returned in a network response into a byte array.The
headers
are inspected for containing anContent-Length
which determines if a size hinted collection,collectBytesInByteBufferStream(Flux, int)
, or default collection,collectBytesInByteBufferStream(Flux)
, will be used.- Parameters:
stream
- A network response ByteBuffer stream.headers
- The HTTP headers of the response.- Returns:
- A Mono which emits the collected network response ByteBuffers.
- Throws:
NullPointerException
- Ifheaders
is null.IllegalStateException
- If the size of the network response is greater thanInteger.MAX_VALUE
.
-
byteBufferToArray
public static byte[] byteBufferToArray(ByteBuffer byteBuffer)
Gets the content of the provided ByteBuffer as a byte array. This method will create a new byte array even if the ByteBuffer can have optionally backing array.- Parameters:
byteBuffer
- the byte buffer- Returns:
- the byte array
-
createRetriableDownloadFlux
public static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable,Long,Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries)
Creates aFlux
that is capable of resuming a download by applying retry logic when an error occurs.- Parameters:
downloadSupplier
- Supplier of the initial download.onDownloadErrorResume
-BiFunction
ofThrowable
andLong
which is used to resume downloading when an error occurs.maxRetries
- The maximum number of times a download can be resumed when an error occurs.- Returns:
- A
Flux
that downloads reliably.
-
createRetriableDownloadFlux
public static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable,Long,Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries, long position)
Creates aFlux
that is capable of resuming a download by applying retry logic when an error occurs.- Parameters:
downloadSupplier
- Supplier of the initial download.onDownloadErrorResume
-BiFunction
ofThrowable
andLong
which is used to resume downloading when an error occurs.maxRetries
- The maximum number of times a download can be resumed when an error occurs.position
- The initial offset for the download.- Returns:
- A
Flux
that downloads reliably.
-
toFluxByteBuffer
public static Flux<ByteBuffer> toFluxByteBuffer(InputStream inputStream)
Converts anInputStream
into aFlux
ofByteBuffer
using a chunk size of 4096.Given that
InputStream
is not guaranteed to be replayable the returnedFlux
should be considered non-replayable as well.If the passed
InputStream
isnull
Flux.empty()
will be returned.- Parameters:
inputStream
- TheInputStream
to convert into aFlux
.- Returns:
- A
Flux
ofByteBuffers
that contains the contents of the stream.
-
toFluxByteBuffer
public static Flux<ByteBuffer> toFluxByteBuffer(InputStream inputStream, int chunkSize)
Converts anInputStream
into aFlux
ofByteBuffer
.Given that
InputStream
is not guaranteed to be replayable the returnedFlux
should be considered non-replayable as well.If the passed
InputStream
isnull
Flux.empty()
will be returned.- Parameters:
inputStream
- TheInputStream
to convert into aFlux
.chunkSize
- The requested size for eachByteBuffer
.- Returns:
- A
Flux
ofByteBuffers
that contains the contents of the stream. - Throws:
IllegalArgumentException
- IfchunkSize
is less than or equal to0
.
-
withContext
public static <T> Mono<T> withContext(Function<Context,Mono<T>> serviceCall)
This method converts the incomingdeferContextual
fromReactor Context
toAzure Context
and calls the given lambda function with this context and returns a single entity of typeT
If the reactor context is empty,
Context.NONE
will be used to call the lambda functionCode samples
String prefix = "Hello, "; Mono<String> response = FluxUtil .withContext(context -> serviceCallReturnsSingle(prefix, context));
- Type Parameters:
T
- The type of response returned from the service call- Parameters:
serviceCall
- The lambda function that makes the service call into which azure context will be passed- Returns:
- The response from service call
-
withContext
public static <T> Mono<T> withContext(Function<Context,Mono<T>> serviceCall, Map<String,String> contextAttributes)
This method converts the incomingdeferContextual
fromReactor Context
toAzure Context
, adds the specified context attributes and calls the given lambda function with this context and returns a single entity of typeT
If the reactor context is empty,
Context.NONE
will be used to call the lambda function- Type Parameters:
T
- The type of response returned from the service call- Parameters:
serviceCall
- serviceCall The lambda function that makes the service call into which azure context will be passedcontextAttributes
- The map of attributes sent by the calling method to be set onContext
.- Returns:
- The response from service call
-
toMono
public static <T> Mono<T> toMono(Response<T> response)
Converts the incoming content to Mono.
-
monoError
public static <T> Mono<T> monoError(ClientLogger logger, RuntimeException ex)
Propagates aRuntimeException
through the error channel ofMono
.- Type Parameters:
T
- The return type.- Parameters:
logger
- TheClientLogger
to log the exception.ex
- TheRuntimeException
.- Returns:
- A
Mono
that terminates with error wrapping theRuntimeException
.
-
fluxError
public static <T> Flux<T> fluxError(ClientLogger logger, RuntimeException ex)
Propagates aRuntimeException
through the error channel ofFlux
.- Type Parameters:
T
- The return type.- Parameters:
logger
- TheClientLogger
to log the exception.ex
- TheRuntimeException
.- Returns:
- A
Flux
that terminates with error wrapping theRuntimeException
.
-
pagedFluxError
public static <T> PagedFlux<T> pagedFluxError(ClientLogger logger, RuntimeException ex)
Propagates aRuntimeException
through the error channel ofPagedFlux
.- Type Parameters:
T
- The return type.- Parameters:
logger
- TheClientLogger
to log the exception.ex
- TheRuntimeException
.- Returns:
- A
PagedFlux
that terminates with error wrapping theRuntimeException
.
-
fluxContext
public static <T> Flux<T> fluxContext(Function<Context,Flux<T>> serviceCall)
This method converts the incomingdeferContextual
fromReactor Context
toAzure Context
and calls the given lambda function with this context and returns a collection of typeT
If the reactor context is empty,
Context.NONE
will be used to call the lambda functionCode samples
String prefix = "Hello, "; Flux<String> response = FluxUtil .fluxContext(context -> serviceCallReturnsCollection(prefix, context));
- Type Parameters:
T
- The type of response returned from the service call- Parameters:
serviceCall
- The lambda function that makes the service call into which the context will be passed- Returns:
- The response from service call
-
toReactorContext
public static Context toReactorContext(Context context)
Converts an Azure context to Reactor context. If the Azure context isnull
or empty,Context.empty()
will be returned.- Parameters:
context
- The Azure context.- Returns:
- The Reactor context.
-
writeFile
public static Mono<Void> writeFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile)
Writes theByteBuffers
emitted by aFlux
ofByteBuffer
to anAsynchronousFileChannel
.The
outFile
is not closed by this call, closing of theoutFile
is managed by the caller.The response
Mono
will emit an error ifcontent
oroutFile
are null. Additionally, an error will be emitted if theoutFile
wasn't opened with the proper open options, such asStandardOpenOption.WRITE
.- Parameters:
content
- TheFlux
ofByteBuffer
content.outFile
- TheAsynchronousFileChannel
.- Returns:
- A
Mono
which emits a completion status once theFlux
has been written to theAsynchronousFileChannel
.
-
writeFile
public static Mono<Void> writeFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile, long position)
Writes theByteBuffers
emitted by aFlux
ofByteBuffer
to anAsynchronousFileChannel
starting at the givenposition
in the file.The
outFile
is not closed by this call, closing of theoutFile
is managed by the caller.The response
Mono
will emit an error ifcontent
oroutFile
are null orposition
is less than 0. Additionally, an error will be emitted if theoutFile
wasn't opened with the proper open options, such asStandardOpenOption.WRITE
.- Parameters:
content
- TheFlux
ofByteBuffer
content.outFile
- TheAsynchronousFileChannel
.position
- The position in the file to begin writing thecontent
.- Returns:
- A
Mono
which emits a completion status once theFlux
has been written to theAsynchronousFileChannel
.
-
readFile
public static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length)
Creates aFlux
from anAsynchronousFileChannel
which reads part of a file into chunks of the given size.- Parameters:
fileChannel
- The file channel.chunkSize
- the size of file chunks to read.offset
- The offset in the file to begin reading.length
- The number of bytes to read from the file.- Returns:
- the Flux.
-
readFile
public static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel, long offset, long length)
Creates aFlux
from anAsynchronousFileChannel
which reads part of a file.- Parameters:
fileChannel
- The file channel.offset
- The offset in the file to begin reading.length
- The number of bytes to read from the file.- Returns:
- the Flux.
-
readFile
public static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel)
Creates aFlux
from anAsynchronousFileChannel
which reads the entire file.- Parameters:
fileChannel
- The file channel.- Returns:
- The AsyncInputStream.
-
-