Class ServiceBusReceiverAsyncClient
- All Implemented Interfaces:
AutoCloseable
messages
from a specific
queue or topic subscription.
Create an instance of receiver
// The required parameters is connectionString, a way to authenticate with Service Bus using credentials. // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusReceiverAsyncClient consumer = new ServiceBusClientBuilder() .connectionString(connectionString) .receiver() .queueName(queueName) .buildAsyncClient();
Create an instance of receiver using default credential
// The required parameters is connectionString, a way to authenticate with Service Bus using credentials. ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .credential("<<fully-qualified-namespace>>", new DefaultAzureCredentialBuilder().build()) .receiver() .queueName("<< QUEUE NAME >>") .buildAsyncClient();
Receive all messages from Service Bus resource
This returns an infinite stream of messages from Service Bus. The stream ends when the subscription is disposed
or other terminal scenarios. See receiveMessages()
for more information.
Disposable subscription = receiver.receiveMessages() .subscribe(message -> { System.out.printf("Received Seq #: %s%n", message.getSequenceNumber()); System.out.printf("Contents of message as string: %s%n", message.getBody()); }, error -> System.out.println("Error occurred: " + error), () -> System.out.println("Receiving complete.")); // When program ends, or you're done receiving all messages. subscription.dispose(); receiver.close();
Receive messages in ServiceBusReceiveMode.RECEIVE_AND_DELETE
mode from a Service Bus
entity
// Keep a reference to `subscription`. When the program is finished receiving messages, call // subscription.dispose(). This will stop fetching messages from the Service Bus. Disposable subscription = receiver.receiveMessages() .subscribe(message -> { System.out.printf("Received Seq #: %s%n", message.getSequenceNumber()); System.out.printf("Contents of message as string: %s%n", message.getBody().toString()); }, error -> System.err.print(error));
Receive messages from a specific session
To fetch messages from a specific session, switch to ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder
and
build the session receiver client. Use ServiceBusSessionReceiverAsyncClient.acceptSession(String)
to create
a session-bound ServiceBusReceiverAsyncClient
.
// The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() .queueName(queueName) .buildAsyncClient(); // acceptSession(String) completes successfully with a receiver when "<< my-session-id >>" session is // successfully locked. // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes. // `Mono.usingWhen` can also be used if the resource closure only returns a single item. Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen( sessionReceiver.acceptSession("<< my-session-id >>"), receiver -> receiver.receiveMessages(), receiver -> Mono.fromRunnable(() -> receiver.close())); // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code // is non-blocking and kicks off the operation. Disposable subscription = sessionMessages.subscribe( message -> System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody()), error -> System.err.print(error));
Receive messages from the first available session
To process messages from the first available session, switch to ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder
and build the session receiver client. Use
acceptNextSession()
to find the first available
session to process messages from.
// The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() .queueName(queueName) .buildAsyncClient(); // acceptNextSession() completes successfully with a receiver when it acquires the next available session. // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes. // `Mono.usingWhen` can also be used if the resource closure only returns a single item. Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen( sessionReceiver.acceptNextSession(), receiver -> receiver.receiveMessages(), receiver -> Mono.fromRunnable(() -> receiver.close())); // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code // is non-blocking and kicks off the operation. Disposable subscription = sessionMessages.subscribe( message -> System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody()), error -> System.err.print(error));
Rate limiting consumption of messages from a Service Bus entity
For message receivers that need to limit the number of messages they receive at a given time, they can use
BaseSubscriber.request(long)
.
receiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() { private static final int NUMBER_OF_MESSAGES = 5; private final AtomicInteger currentNumberOfMessages = new AtomicInteger(); @Override protected void hookOnSubscribe(Subscription subscription) { // Tell the Publisher we only want 5 message at a time. request(NUMBER_OF_MESSAGES); } @Override protected void hookOnNext(ServiceBusReceivedMessage message) { // Process the ServiceBusReceivedMessage // If the number of messages we have currently received is a multiple of 5, that means we have reached // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher // that the subscriber is ready to get more messages from upstream. if (currentNumberOfMessages.incrementAndGet() % 5 == 0) { request(NUMBER_OF_MESSAGES); } } });
-
Method Summary
Modifier and TypeMethodDescriptionabandon
(ServiceBusReceivedMessage message) Abandons amessage
.abandon
(ServiceBusReceivedMessage message, AbandonOptions options) Abandons amessage
updates the message's properties.void
close()
Disposes of the consumer by closing the underlying links to the service.commitTransaction
(ServiceBusTransactionContext transactionContext) Commits the transaction and all the operations associated with it.complete
(ServiceBusReceivedMessage message) Completes amessage
.complete
(ServiceBusReceivedMessage message, CompleteOptions options) Completes amessage
with the given options.Starts a new service side transaction.deadLetter
(ServiceBusReceivedMessage message) Moves amessage
to the dead-letter sub-queue.deadLetter
(ServiceBusReceivedMessage message, DeadLetterOptions options) Moves amessage
to the dead-letter sub-queue with the given options.defer
(ServiceBusReceivedMessage message) Defers amessage
.defer
(ServiceBusReceivedMessage message, DeferOptions options) Defers amessage
with the options set.Gets the Service Bus resource this client interacts with.Gets the fully qualified Service Bus namespace that the connection is associated with.Gets the identifier of the instance ofServiceBusReceiverAsyncClient
.Gets the SessionId of the session if this receiver is a session receiver.Mono<byte[]>
Gets the state of the session if this receiver is a session receiver.Reads the next active message without changing the state of the receiver or the message source.peekMessage
(long sequenceNumber) Starting from the given sequence number, reads next the active message without changing the state of the receiver or the message source.peekMessages
(int maxMessages) Reads the next batch of active messages without changing the state of the receiver or the message source.peekMessages
(int maxMessages, long sequenceNumber) Starting from the given sequence number, reads the next batch of active messages without changing the state of the receiver or the message source.receiveDeferredMessage
(long sequenceNumber) Receives a deferredmessage
.receiveDeferredMessages
(Iterable<Long> sequenceNumbers) Receives a batch of deferredmessages
.Receives an infinite stream ofmessages
from the Service Bus entity.Asynchronously renews the lock on the message.renewMessageLock
(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration) Starts the auto lock renewal for amessage
.Renews the session lock if this receiver is a session receiver.renewSessionLock
(Duration maxLockRenewalDuration) Starts the auto lock renewal for the session this receiver works for.rollbackTransaction
(ServiceBusTransactionContext transactionContext) Rollbacks the transaction given and all operations associated with it.setSessionState
(byte[] sessionState) Sets the state of the session this receiver works for.
-
Method Details
-
getFullyQualifiedNamespace
Gets the fully qualified Service Bus namespace that the connection is associated with. This is likely similar to{yournamespace}.servicebus.windows.net
.- Returns:
- The fully qualified Service Bus namespace that the connection is associated with.
-
getEntityPath
Gets the Service Bus resource this client interacts with.- Returns:
- The Service Bus resource this client interacts with.
-
getSessionId
Gets the SessionId of the session if this receiver is a session receiver.- Returns:
- The SessionId or null if this is not a session receiver.
-
getIdentifier
Gets the identifier of the instance ofServiceBusReceiverAsyncClient
.- Returns:
- The identifier that can identify the instance of
ServiceBusReceiverAsyncClient
.
-
abandon
Abandons amessage
. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.- Parameters:
message
- TheServiceBusReceivedMessage
to perform this operation.- Returns:
- A
Mono
that completes when the Service Bus abandon operation completes. - Throws:
NullPointerException
- ifmessage
is null.UnsupportedOperationException
- if the receiver was opened inServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received frompeekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be abandoned.IllegalArgumentException
- if the message has either been deleted or already settled.
-
abandon
Abandons amessage
updates the message's properties. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.- Parameters:
message
- TheServiceBusReceivedMessage
to perform this operation.options
- The options to set while abandoning the message.- Returns:
- A
Mono
that completes when the Service Bus operation finishes. - Throws:
NullPointerException
- ifmessage
oroptions
is null. Also iftransactionContext.transactionId
is null whenoptions.transactionContext
is specified.UnsupportedOperationException
- if the receiver was opened inServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received frompeekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be abandoned.IllegalArgumentException
- if the message has either been deleted or already settled.
-
complete
Completes amessage
. This will delete the message from the service.- Parameters:
message
- TheServiceBusReceivedMessage
to perform this operation.- Returns:
- A
Mono
that finishes when the message is completed on Service Bus. - Throws:
NullPointerException
- ifmessage
is null.UnsupportedOperationException
- if the receiver was opened inServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received frompeekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be completed.IllegalArgumentException
- if the message has either been deleted or already settled.
-
complete
Completes amessage
with the given options. This will delete the message from the service.- Parameters:
message
- TheServiceBusReceivedMessage
to perform this operation.options
- Options used to complete the message.- Returns:
- A
Mono
that finishes when the message is completed on Service Bus. - Throws:
NullPointerException
- ifmessage
oroptions
is null. Also iftransactionContext.transactionId
is null whenoptions.transactionContext
is specified.UnsupportedOperationException
- if the receiver was opened inServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received frompeekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be completed.IllegalArgumentException
- if the message has either been deleted or already settled.
-
defer
Defers amessage
. This will move message into the deferred sub-queue.- Parameters:
message
- TheServiceBusReceivedMessage
to perform this operation.- Returns:
- A
Mono
that completes when the Service Bus defer operation finishes. - Throws:
NullPointerException
- ifmessage
is null.UnsupportedOperationException
- if the receiver was opened inServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received frompeekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be deferred.IllegalArgumentException
- if the message has either been deleted or already settled.- See Also:
-
defer
Defers amessage
with the options set. This will move message into the deferred sub-queue.- Parameters:
message
- TheServiceBusReceivedMessage
to perform this operation.options
- Options used to defer the message.- Returns:
- A
Mono
that completes when the defer operation finishes. - Throws:
NullPointerException
- ifmessage
oroptions
is null. Also iftransactionContext.transactionId
is null whenoptions.transactionContext
is specified.UnsupportedOperationException
- if the receiver was opened inServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received frompeekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be deferred.IllegalArgumentException
- if the message has either been deleted or already settled.- See Also:
-
deadLetter
Moves amessage
to the dead-letter sub-queue.- Parameters:
message
- TheServiceBusReceivedMessage
to perform this operation.- Returns:
- A
Mono
that completes when the dead letter operation finishes. - Throws:
NullPointerException
- ifmessage
is null.UnsupportedOperationException
- if the receiver was opened inServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received frompeekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be dead-lettered.IllegalArgumentException
- if the message has either been deleted or already settled.- See Also:
-
deadLetter
Moves amessage
to the dead-letter sub-queue with the given options.- Parameters:
message
- TheServiceBusReceivedMessage
to perform this operation.options
- Options used to dead-letter the message.- Returns:
- A
Mono
that completes when the dead letter operation finishes. - Throws:
NullPointerException
- ifmessage
oroptions
is null. Also iftransactionContext.transactionId
is null whenoptions.transactionContext
is specified.UnsupportedOperationException
- if the receiver was opened inServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received frompeekMessage
.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the message could not be dead-lettered.IllegalArgumentException
- if the message has either been deleted or already settled.- See Also:
-
getSessionState
Gets the state of the session if this receiver is a session receiver.- Returns:
- The session state or an empty Mono if there is no state set for the session.
- Throws:
IllegalStateException
- if the receiver is a non-session receiver or receiver is already closed.ServiceBusException
- if the session state could not be acquired.
-
peekMessage
Reads the next active message without changing the state of the receiver or the message source. The first call topeek()
fetches the first active message for this receiver. Each subsequent call fetches the subsequent message in the entity.- Returns:
- A peeked
ServiceBusReceivedMessage
. - Throws:
IllegalStateException
- if receiver is already disposed.ServiceBusException
- if an error occurs while peeking at the message.- See Also:
-
peekMessage
Starting from the given sequence number, reads next the active message without changing the state of the receiver or the message source.- Parameters:
sequenceNumber
- The sequence number from where to read the message.- Returns:
- A peeked
ServiceBusReceivedMessage
. - Throws:
IllegalStateException
- if receiver is already disposed.ServiceBusException
- if an error occurs while peeking at the message.- See Also:
-
peekMessages
Reads the next batch of active messages without changing the state of the receiver or the message source.- Parameters:
maxMessages
- The number of messages.- Returns:
- A
Flux
ofmessages
that are peeked. - Throws:
IllegalArgumentException
- ifmaxMessages
is not a positive integer.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if an error occurs while peeking at messages.- See Also:
-
peekMessages
Starting from the given sequence number, reads the next batch of active messages without changing the state of the receiver or the message source.- Parameters:
maxMessages
- The number of messages.sequenceNumber
- The sequence number from where to start reading messages.- Returns:
- A
Flux
ofmessages
peeked. - Throws:
IllegalArgumentException
- ifmaxMessages
is not a positive integer.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if an error occurs while peeking at messages.- See Also:
-
receiveMessages
Receives an infinite stream ofmessages
from the Service Bus entity. This Flux continuously receives messages from a Service Bus entity until either:- The receiver is closed.
- The subscription to the Flux is disposed.
- A terminal signal from a downstream subscriber is propagated upstream (ie.
Flux.take(long)
orFlux.take(Duration)
). - An
AmqpException
occurs that causes the receive link to stop.
- Returns:
- An infinite stream of messages from the Service Bus entity.
- Throws:
IllegalStateException
- if receiver is already disposed.ServiceBusException
- if an error occurs while receiving messages.
-
receiveDeferredMessage
Receives a deferredmessage
. Deferred messages can only be received by using sequence number.- Parameters:
sequenceNumber
- Thesequence number
of the message.- Returns:
- A deferred message with the matching
sequenceNumber
. - Throws:
IllegalStateException
- if receiver is already disposed.ServiceBusException
- if deferred message cannot be received.
-
receiveDeferredMessages
Receives a batch of deferredmessages
. Deferred messages can only be received by using sequence number.- Parameters:
sequenceNumbers
- The sequence numbers of the deferred messages.- Returns:
- A
Flux
of deferredmessages
. - Throws:
NullPointerException
- ifsequenceNumbers
is null.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if deferred messages cannot be received.
-
renewMessageLock
Asynchronously renews the lock on the message. The lock will be renewed based on the setting specified on the entity. When a message is received inServiceBusReceiveMode.PEEK_LOCK
mode, the message is locked on the server for this receiver instance for a duration as specified during the entity creation (LockDuration). If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is reset to the entity's LockDuration value.- Parameters:
message
- TheServiceBusReceivedMessage
to perform auto-lock renewal.- Returns:
- The new expiration time for the message.
- Throws:
NullPointerException
- ifmessage
ormessage.getLockToken()
is null.UnsupportedOperationException
- if the receiver was opened inServiceBusReceiveMode.RECEIVE_AND_DELETE
mode or if the message was received from peekMessage.IllegalStateException
- if the receiver is a session receiver or receiver is already disposed.IllegalArgumentException
- ifmessage.getLockToken()
is an empty value.
-
renewMessageLock
public Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration) Starts the auto lock renewal for amessage
.- Parameters:
message
- TheServiceBusReceivedMessage
to perform this operation.maxLockRenewalDuration
- Maximum duration to keep renewing the lock token.- Returns:
- A Mono that completes when the message renewal operation has completed up until
maxLockRenewalDuration
. - Throws:
NullPointerException
- ifmessage
,message.getLockToken()
, ormaxLockRenewalDuration
is null.IllegalStateException
- if the receiver is a session receiver or the receiver is disposed.IllegalArgumentException
- ifmessage.getLockToken()
is an empty value.ServiceBusException
- If the message lock cannot be renewed.
-
renewSessionLock
Renews the session lock if this receiver is a session receiver.- Returns:
- The next expiration time for the session lock.
- Throws:
IllegalStateException
- if the receiver is a non-session receiver or if receiver is already disposed.ServiceBusException
- if the session lock cannot be renewed.
-
renewSessionLock
Starts the auto lock renewal for the session this receiver works for.- Parameters:
maxLockRenewalDuration
- Maximum duration to keep renewing the session lock.- Returns:
- A lock renewal operation for the message.
- Throws:
NullPointerException
- ifsessionId
ormaxLockRenewalDuration
is null.IllegalStateException
- if the receiver is a non-session receiver or the receiver is disposed.ServiceBusException
- if the session lock renewal operation cannot be started.IllegalArgumentException
- ifsessionId
is an empty string ormaxLockRenewalDuration
is negative.
-
setSessionState
Sets the state of the session this receiver works for.- Parameters:
sessionState
- State to set on the session.- Returns:
- A Mono that completes when the session is set
- Throws:
IllegalStateException
- if the receiver is a non-session receiver or receiver is already disposed.ServiceBusException
- if the session state cannot be set.
-
createTransaction
Starts a new service side transaction. Thetransaction context
should be passed to all operations that needs to be in this transaction.Creating and using a transaction
// This mono creates a transaction and caches the output value, so we can associate operations with the // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying // the operation. Mono<ServiceBusTransactionContext> transactionContext = receiver.createTransaction() .cache(value -> Duration.ofMillis(Long.MAX_VALUE), error -> Duration.ZERO, () -> Duration.ZERO); transactionContext.flatMap(transaction -> { // Process messages and associate operations with the transaction. Mono<Void> operations = Mono.when( receiver.receiveDeferredMessage(sequenceNumber).flatMap(message -> receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))), receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction))); // Finally, either commit or rollback the transaction once all the operations are associated with it. return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction)); });
- Returns:
- The
Mono
that finishes this operation on service bus resource. - Throws:
IllegalStateException
- if receiver is already disposed.ServiceBusException
- if a transaction cannot be created.
-
commitTransaction
Commits the transaction and all the operations associated with it.Creating and using a transaction
// This mono creates a transaction and caches the output value, so we can associate operations with the // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying // the operation. Mono<ServiceBusTransactionContext> transactionContext = receiver.createTransaction() .cache(value -> Duration.ofMillis(Long.MAX_VALUE), error -> Duration.ZERO, () -> Duration.ZERO); transactionContext.flatMap(transaction -> { // Process messages and associate operations with the transaction. Mono<Void> operations = Mono.when( receiver.receiveDeferredMessage(sequenceNumber).flatMap(message -> receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))), receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction))); // Finally, either commit or rollback the transaction once all the operations are associated with it. return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction)); });
- Parameters:
transactionContext
- The transaction to be commit.- Returns:
- The
Mono
that finishes this operation on service bus resource. - Throws:
NullPointerException
- iftransactionContext
ortransactionContext.transactionId
is null.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the transaction could not be committed.
-
rollbackTransaction
Rollbacks the transaction given and all operations associated with it.Creating and using a transaction
// This mono creates a transaction and caches the output value, so we can associate operations with the // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying // the operation. Mono<ServiceBusTransactionContext> transactionContext = receiver.createTransaction() .cache(value -> Duration.ofMillis(Long.MAX_VALUE), error -> Duration.ZERO, () -> Duration.ZERO); transactionContext.flatMap(transaction -> { // Process messages and associate operations with the transaction. Mono<Void> operations = Mono.when( receiver.receiveDeferredMessage(sequenceNumber).flatMap(message -> receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))), receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction))); // Finally, either commit or rollback the transaction once all the operations are associated with it. return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction)); });
- Parameters:
transactionContext
- The transaction to rollback.- Returns:
- The
Mono
that finishes this operation on service bus resource. - Throws:
NullPointerException
- iftransactionContext
ortransactionContext.transactionId
is null.IllegalStateException
- if receiver is already disposed.ServiceBusException
- if the transaction could not be rolled back.
-
close
public void close()Disposes of the consumer by closing the underlying links to the service.- Specified by:
close
in interfaceAutoCloseable
-