public final class ServiceBusReceiverAsyncClient extends Object implements AutoCloseable
messages
from a specific
queue or topic subscription.
Create an instance of receiver
// The required parameters is connectionString, a way to authenticate with Service Bus using credentials. // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusReceiverAsyncClient consumer = new ServiceBusClientBuilder() .connectionString(connectionString) .receiver() .queueName(queueName) .buildAsyncClient();
Create an instance of receiver using default credential
// The required parameters is connectionString, a way to authenticate with Service Bus using credentials. ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .credential("<<fully-qualified-namespace>>", new DefaultAzureCredentialBuilder().build()) .receiver() .queueName("<< QUEUE NAME >>") .buildAsyncClient();
Receive all messages from Service Bus resource
This returns an infinite stream of messages from Service Bus. The stream ends when the subscription is disposed
or other terminal scenarios. See ServiceBusReceiverAsyncClient.receiveMessages()
for more information.
Disposable subscription = receiver.receiveMessages() .subscribe(message -> { System.out.printf("Received Seq #: %s%n", message.getSequenceNumber()); System.out.printf("Contents of message as string: %s%n", message.getBody()); }, error -> System.out.println("Error occurred: " + error), () -> System.out.println("Receiving complete.")); // When program ends, or you're done receiving all messages. subscription.dispose(); receiver.close();
Receive messages in ServiceBusReceiveMode.RECEIVE_AND_DELETE
mode from a Service Bus
entity
// Keep a reference to `subscription`. When the program is finished receiving messages, call // subscription.dispose(). This will stop fetching messages from the Service Bus. Disposable subscription = receiver.receiveMessages() .subscribe(message -> { System.out.printf("Received Seq #: %s%n", message.getSequenceNumber()); System.out.printf("Contents of message as string: %s%n", message.getBody().toString()); }, error -> System.err.print(error));
Receive messages from a specific session
To fetch messages from a specific session, switch to ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder
and
build the session receiver client. Use ServiceBusSessionReceiverAsyncClient.acceptSession(String)
to create
a session-bound ServiceBusReceiverAsyncClient
.
// The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() .queueName(queueName) .buildAsyncClient(); // acceptSession(String) completes successfully with a receiver when "<< my-session-id >>" session is // successfully locked. // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes. // `Mono.usingWhen` can also be used if the resource closure only returns a single item. Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen( sessionReceiver.acceptSession("<< my-session-id >>"), receiver -> receiver.receiveMessages(), receiver -> Mono.fromRunnable(() -> receiver.close())); // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code // is non-blocking and kicks off the operation. Disposable subscription = sessionMessages.subscribe( message -> System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody()), error -> System.err.print(error));
Receive messages from the first available session
To process messages from the first available session, switch to ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder
and build the session receiver client. Use
acceptNextSession()
to find the first available
session to process messages from.
// The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() .queueName(queueName) .buildAsyncClient(); // acceptNextSession() completes successfully with a receiver when it acquires the next available session. // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes. // `Mono.usingWhen` can also be used if the resource closure only returns a single item. Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen( sessionReceiver.acceptNextSession(), receiver -> receiver.receiveMessages(), receiver -> Mono.fromRunnable(() -> receiver.close())); // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code // is non-blocking and kicks off the operation. Disposable subscription = sessionMessages.subscribe( message -> System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody()), error -> System.err.print(error));
Rate limiting consumption of messages from a Service Bus entity
For message receivers that need to limit the number of messages they receive at a given time, they can use
BaseSubscriber.request(long)
.
receiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() { private static final int NUMBER_OF_MESSAGES = 5; private final AtomicInteger currentNumberOfMessages = new AtomicInteger(); {@literal @}Override protected void hookOnSubscribe(Subscription subscription) { // Tell the Publisher we only want 5 message at a time. request(NUMBER_OF_MESSAGES); } {@literal @}Override protected void hookOnNext(ServiceBusReceivedMessage message) { // Process the ServiceBusReceivedMessage // If the number of messages we have currently received is a multiple of 5, that means we have reached // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher // that the subscriber is ready to get more messages from upstream. if (currentNumberOfMessages.incrementAndGet() % 5 == 0) { request(NUMBER_OF_MESSAGES); } } });
Modifier and Type | Method and Description |
---|---|
Mono<Void> |
abandon(ServiceBusReceivedMessage message)
Abandons a
message . |
Mono<Void> |
abandon(ServiceBusReceivedMessage message,
AbandonOptions options)
Abandons a
message updates the message's properties. |
void |
close()
Disposes of the consumer by closing the underlying links to the service.
|
Mono<Void> |
commitTransaction(ServiceBusTransactionContext transactionContext)
Commits the transaction and all the operations associated with it.
|
Mono<Void> |
complete(ServiceBusReceivedMessage message)
Completes a
message . |
Mono<Void> |
complete(ServiceBusReceivedMessage message,
CompleteOptions options)
Completes a
message with the given options. |
Mono<ServiceBusTransactionContext> |
createTransaction()
Starts a new service side transaction.
|
Mono<Void> |
deadLetter(ServiceBusReceivedMessage message)
Moves a
message to the dead-letter sub-queue. |
Mono<Void> |
deadLetter(ServiceBusReceivedMessage message,
DeadLetterOptions options)
Moves a
message to the dead-letter sub-queue with the given options. |
Mono<Void> |
defer(ServiceBusReceivedMessage message)
Defers a
message . |
Mono<Void> |
defer(ServiceBusReceivedMessage message,
DeferOptions options)
Defers a
message with the options set. |
String |
getEntityPath()
Gets the Service Bus resource this client interacts with.
|
String |
getFullyQualifiedNamespace()
Gets the fully qualified Service Bus namespace that the connection is associated with.
|
Mono<byte[]> |
getSessionState()
Gets the state of the session if this receiver is a session receiver.
|
Mono<ServiceBusReceivedMessage> |
peekMessage()
Reads the next active message without changing the state of the receiver or the message source.
|
Mono<ServiceBusReceivedMessage> |
peekMessage(long sequenceNumber)
Starting from the given sequence number, reads next the active message without changing the state of the receiver
or the message source.
|
Flux<ServiceBusReceivedMessage> |
peekMessages(int maxMessages)
Reads the next batch of active messages without changing the state of the receiver or the message source.
|
Flux<ServiceBusReceivedMessage> |
peekMessages(int maxMessages,
long sequenceNumber)
Starting from the given sequence number, reads the next batch of active messages without changing the state of
the receiver or the message source.
|
Mono<ServiceBusReceivedMessage> |
receiveDeferredMessage(long sequenceNumber)
Receives a deferred
message . |
Flux<ServiceBusReceivedMessage> |
receiveDeferredMessages(Iterable<Long> sequenceNumbers)
Receives a batch of deferred
messages . |
Flux<ServiceBusReceivedMessage> |
receiveMessages()
Receives an infinite stream of
messages from the Service Bus entity. |
Mono<OffsetDateTime> |
renewMessageLock(ServiceBusReceivedMessage message)
Asynchronously renews the lock on the message.
|
Mono<Void> |
renewMessageLock(ServiceBusReceivedMessage message,
Duration maxLockRenewalDuration)
Starts the auto lock renewal for a
message . |
Mono<OffsetDateTime> |
renewSessionLock()
Renews the session lock if this receiver is a session receiver.
|
Mono<Void> |
renewSessionLock(Duration maxLockRenewalDuration)
Starts the auto lock renewal for the session this receiver works for.
|
Mono<Void> |
rollbackTransaction(ServiceBusTransactionContext transactionContext)
Rollbacks the transaction given and all operations associated with it.
|
Mono<Void> |
setSessionState(byte[] sessionState)
Sets the state of the session this receiver works for.
|
public String getFullyQualifiedNamespace()
{yournamespace}.servicebus.windows.net
.public String getEntityPath()
public Mono<Void> abandon(ServiceBusReceivedMessage message)
message
. This will make the message available again for processing.
Abandoning a message will increase the delivery count on the message.message
- The ServiceBusReceivedMessage
to perform this operation.Mono
that completes when the Service Bus abandon operation completes.NullPointerException
- if message
is null.UnsupportedOperationException
- if the receiver was opened in
ServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received fromIllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be abandoned.public Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options)
message
updates the message's properties. This will make the
message available again for processing. Abandoning a message will increase the delivery count on the message.message
- The ServiceBusReceivedMessage
to perform this operation.options
- The options to set while abandoning the message.Mono
that completes when the Service Bus operation finishes.NullPointerException
- if message
or options
is null. Also if
transactionContext.transactionId
is null when options.transactionContext
is specified.UnsupportedOperationException
- if the receiver was opened in
ServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received from
peekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be abandoned.public Mono<Void> complete(ServiceBusReceivedMessage message)
message
. This will delete the message from the service.message
- The ServiceBusReceivedMessage
to perform this operation.Mono
that finishes when the message is completed on Service Bus.NullPointerException
- if message
is null.UnsupportedOperationException
- if the receiver was opened in
ServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received from
peekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be completed.public Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options)
message
with the given options. This will delete the message from
the service.message
- The ServiceBusReceivedMessage
to perform this operation.options
- Options used to complete the message.Mono
that finishes when the message is completed on Service Bus.NullPointerException
- if message
or options
is null. Also if
transactionContext.transactionId
is null when options.transactionContext
is specified.UnsupportedOperationException
- if the receiver was opened in
ServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received from
peekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be completed.public Mono<Void> defer(ServiceBusReceivedMessage message)
message
. This will move message into the deferred sub-queue.message
- The ServiceBusReceivedMessage
to perform this operation.Mono
that completes when the Service Bus defer operation finishes.NullPointerException
- if message
is null.UnsupportedOperationException
- if the receiver was opened in
ServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received from
peekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be deferred.public Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options)
message
with the options set. This will move message into
the deferred sub-queue.message
- The ServiceBusReceivedMessage
to perform this operation.options
- Options used to defer the message.Mono
that completes when the defer operation finishes.NullPointerException
- if message
or options
is null. Also if
transactionContext.transactionId
is null when options.transactionContext
is specified.UnsupportedOperationException
- if the receiver was opened in
ServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received from
peekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be deferred.public Mono<Void> deadLetter(ServiceBusReceivedMessage message)
message
to the dead-letter sub-queue.message
- The ServiceBusReceivedMessage
to perform this operation.Mono
that completes when the dead letter operation finishes.NullPointerException
- if message
is null.UnsupportedOperationException
- if the receiver was opened in
ServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received from
peekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be dead-lettered.public Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)
message
to the dead-letter sub-queue with the given options.message
- The ServiceBusReceivedMessage
to perform this operation.options
- Options used to dead-letter the message.Mono
that completes when the dead letter operation finishes.NullPointerException
- if message
or options
is null. Also if
transactionContext.transactionId
is null when options.transactionContext
is specified.UnsupportedOperationException
- if the receiver was opened in
ServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received from
peekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be dead-lettered.public Mono<byte[]> getSessionState()
IllegalStateException
- if the receiver is a non-session receiver or receiver is already closed.ServiceBusException
- if the session state could not be acquired.public Mono<ServiceBusReceivedMessage> peekMessage()
peek()
fetches the first active message for this receiver. Each subsequent call fetches the subsequent
message in the entity.ServiceBusReceivedMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if an error occurs while peeking at the message.public Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber)
sequenceNumber
- The sequence number from where to read the message.ServiceBusReceivedMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if an error occurs while peeking at the message.public Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages)
maxMessages
- The number of messages.Flux
of messages
that are peeked.IllegalArgumentException
- if maxMessages
is not a positive integer.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if an error occurs while peeking at messages.public Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)
maxMessages
- The number of messages.sequenceNumber
- The sequence number from where to start reading messages.Flux
of messages
peeked.IllegalArgumentException
- if maxMessages
is not a positive integer.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if an error occurs while peeking at messages.public Flux<ServiceBusReceivedMessage> receiveMessages()
messages
from the Service Bus entity.
This Flux continuously receives messages from a Service Bus entity until either:
Flux.take(long)
or
Flux.take(Duration)
).AmqpException
occurs that causes the receive link to stop.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if an error occurs while receiving messages.public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber)
message
. Deferred messages can only be received by using
sequence number.sequenceNumber
- The sequence number
of the
message.sequenceNumber
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if deferred message cannot be received.public Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers)
messages
. Deferred messages can only be received
by using sequence number.sequenceNumbers
- The sequence numbers of the deferred messages.Flux
of deferred messages
.NullPointerException
- if sequenceNumbers
is null.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if deferred messages cannot be received.public Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)
ServiceBusReceiveMode.PEEK_LOCK
mode, the message is locked on the
server for this receiver instance for a duration as specified during the entity creation (LockDuration). If
processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the
lock is reset to the entity's LockDuration value.message
- The ServiceBusReceivedMessage
to perform auto-lock renewal.NullPointerException
- if message
or message.getLockToken()
is null.UnsupportedOperationException
- if the receiver was opened in
ServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received from peekMessage.IllegalStateException
- if the receiver is a session receiver or receiver is already disposed.IllegalArgumentException
- if message.getLockToken()
is an empty value.public Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)
message
.message
- The ServiceBusReceivedMessage
to perform this operation.maxLockRenewalDuration
- Maximum duration to keep renewing the lock token.maxLockRenewalDuration
.NullPointerException
- if message
, message.getLockToken()
, or
maxLockRenewalDuration
is null.IllegalStateException
- if the receiver is a session receiver or the receiver is disposed.IllegalArgumentException
- if message.getLockToken()
is an empty value.ServiceBusException
- If the message lock cannot be renewed.public Mono<OffsetDateTime> renewSessionLock()
IllegalStateException
- if the receiver is a non-session receiver or if receiver is already disposed.ServiceBusException
- if the session lock cannot be renewed.public Mono<Void> renewSessionLock(Duration maxLockRenewalDuration)
maxLockRenewalDuration
- Maximum duration to keep renewing the session lock.NullPointerException
- if maxLockRenewalDuration
is null.IllegalStateException
- if the receiver is a non-session receiver or the receiver is disposed.ServiceBusException
- if the session lock renewal operation cannot be started.public Mono<Void> setSessionState(byte[] sessionState)
sessionState
- State to set on the session.IllegalStateException
- if the receiver is a non-session receiver or receiver is already disposed.ServiceBusException
- if the session state cannot be set.public Mono<ServiceBusTransactionContext> createTransaction()
transaction context
should be
passed to all operations that needs to be in this transaction.
Creating and using a transaction
// This mono creates a transaction and caches the output value, so we can associate operations with the // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying // the operation. Mono<ServiceBusTransactionContext> transactionContext = receiver.createTransaction() .cache(value -> Duration.ofMillis(Long.MAX_VALUE), error -> Duration.ZERO, () -> Duration.ZERO); transactionContext.flatMap(transaction -> { // Process messages and associate operations with the transaction. Mono<Void> operations = Mono.when( receiver.receiveDeferredMessage(sequenceNumber).flatMap(message -> receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))), receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction))); // Finally, either commit or rollback the transaction once all the operations are associated with it. return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction)); });
Mono
that finishes this operation on service bus resource.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if a transaction cannot be created.public Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)
Creating and using a transaction
// This mono creates a transaction and caches the output value, so we can associate operations with the // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying // the operation. Mono<ServiceBusTransactionContext> transactionContext = receiver.createTransaction() .cache(value -> Duration.ofMillis(Long.MAX_VALUE), error -> Duration.ZERO, () -> Duration.ZERO); transactionContext.flatMap(transaction -> { // Process messages and associate operations with the transaction. Mono<Void> operations = Mono.when( receiver.receiveDeferredMessage(sequenceNumber).flatMap(message -> receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))), receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction))); // Finally, either commit or rollback the transaction once all the operations are associated with it. return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction)); });
transactionContext
- The transaction to be commit.Mono
that finishes this operation on service bus resource.NullPointerException
- if transactionContext
or transactionContext.transactionId
is null.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the transaction could not be committed.public Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)
Creating and using a transaction
// This mono creates a transaction and caches the output value, so we can associate operations with the // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying // the operation. Mono<ServiceBusTransactionContext> transactionContext = receiver.createTransaction() .cache(value -> Duration.ofMillis(Long.MAX_VALUE), error -> Duration.ZERO, () -> Duration.ZERO); transactionContext.flatMap(transaction -> { // Process messages and associate operations with the transaction. Mono<Void> operations = Mono.when( receiver.receiveDeferredMessage(sequenceNumber).flatMap(message -> receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))), receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction))); // Finally, either commit or rollback the transaction once all the operations are associated with it. return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction)); });
transactionContext
- The transaction to rollback.Mono
that finishes this operation on service bus resource.NullPointerException
- if transactionContext
or transactionContext.transactionId
is null.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the transaction could not be rolled back.public void close()
close
in interface AutoCloseable
Copyright © 2021 Microsoft Corporation. All rights reserved.