public final class ServiceBusReceiverAsyncClient extends Object implements AutoCloseable
ServiceBusReceivedMessage
from a specific queue or
topic on Azure Service Bus.
Create an instance of receiver
// The required parameters is connectionString, a way to authenticate with Service Bus using credentials. ServiceBusReceiverAsyncClient consumer = new ServiceBusClientBuilder() .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + "SharedAccessKey={key};EntityPath={eh-name}") .receiver() .queueName("<< QUEUE NAME >>") .buildAsyncClient();
Create an instance of sender using default credential
// The required parameters is connectionString, a way to authenticate with Service Bus using credentials. ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .credential("<<fully-qualified-namespace>>", new DefaultAzureCredentialBuilder().build()) .receiver() .queueName("<< QUEUE NAME >>") .buildAsyncClient();
Receive all messages from Service Bus resource
Disposable subscription = receiver.receive().subscribe(context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Received message id: %s%n", message.getMessageId()); System.out.printf("Contents of message as string: %s%n", new String(message.getBody(), UTF_8)); }); // When program ends, or you're done receiving all messages. receiver.close(); subscription.dispose();
Receive messages in ReceiveMode.RECEIVE_AND_DELETE
mode from Service Bus resource
// Keep a reference to `subscription`. When the program is finished receiving messages, call // subscription.dispose(). This will stop fetching messages from the Service Bus. Disposable subscription = receiver.receive() .subscribe(context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Received message id: %s%n", message.getMessageId()); System.out.printf("Contents of message as string: %s%n", new String(message.getBody(), UTF_8)); }, error -> System.err.print(error));
Rate limiting consumption of messages from Service Bus resource
For message receivers that need to limit the number of messages they receive at a given time, they can use
BaseSubscriber.request(long)
.
receiver.receive().subscribe(new BaseSubscriber<ServiceBusReceivedMessageContext>() { private static final int NUMBER_OF_MESSAGES = 5; private final AtomicInteger currentNumberOfMessages = new AtomicInteger(); {@literal @}Override protected void hookOnSubscribe(Subscription subscription) { // Tell the Publisher we only want 5 message at a time. request(NUMBER_OF_MESSAGES); } {@literal @}Override protected void hookOnNext(ServiceBusReceivedMessageContext value) { // Process the ServiceBusReceivedMessage ServiceBusReceivedMessage message = value.getMessage(); // If the number of messages we have currently received is a multiple of 5, that means we have reached // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher // that the subscriber is ready to get more messages from upstream. if (currentNumberOfMessages.incrementAndGet() % 5 == 0) { request(NUMBER_OF_MESSAGES); } } });
Modifier and Type | Method and Description |
---|---|
Mono<Void> |
abandon(MessageLockToken lockToken)
Abandon a
message with its lock token. |
Mono<Void> |
abandon(MessageLockToken lockToken,
Map<String,Object> propertiesToModify)
Abandon a
message with its lock token and updates the message's properties. |
Mono<Void> |
abandon(MessageLockToken lockToken,
Map<String,Object> propertiesToModify,
String sessionId)
Abandon a
message with its lock token and updates the message's properties. |
Mono<Void> |
abandon(MessageLockToken lockToken,
String sessionId)
Abandon a
message with its lock token. |
void |
close()
Disposes of the consumer by closing the underlying connection to the service.
|
Mono<Void> |
complete(MessageLockToken lockToken)
Completes a
message using its lock token. |
Mono<Void> |
complete(MessageLockToken lockToken,
String sessionId)
Completes a
message using its lock token. |
Mono<Void> |
deadLetter(MessageLockToken lockToken)
Moves a
message to the deadletter sub-queue. |
Mono<Void> |
deadLetter(MessageLockToken lockToken,
DeadLetterOptions deadLetterOptions)
Moves a
message to the deadletter subqueue with deadletter reason, error
description, and/or modified properties. |
Mono<Void> |
deadLetter(MessageLockToken lockToken,
DeadLetterOptions deadLetterOptions,
String sessionId)
Moves a
message to the deadletter subqueue with deadletter reason, error
description, and/or modified properties. |
Mono<Void> |
deadLetter(MessageLockToken lockToken,
String sessionId)
Moves a
message to the deadletter sub-queue. |
Mono<Void> |
defer(MessageLockToken lockToken)
Defers a
message using its lock token. |
Mono<Void> |
defer(MessageLockToken lockToken,
Map<String,Object> propertiesToModify)
Defers a
message using its lock token with modified message property. |
Mono<Void> |
defer(MessageLockToken lockToken,
Map<String,Object> propertiesToModify,
String sessionId)
Defers a
message using its lock token with modified message property. |
Mono<Void> |
defer(MessageLockToken lockToken,
String sessionId)
Defers a
message using its lock token. |
String |
getEntityPath()
Gets the Service Bus resource this client interacts with.
|
String |
getFullyQualifiedNamespace()
Gets the fully qualified Service Bus namespace that the connection is associated with.
|
Mono<byte[]> |
getSessionState(String sessionId)
Gets the state of a session given its identifier.
|
Mono<ServiceBusReceivedMessage> |
peek()
Reads the next active message without changing the state of the receiver or the message source.
|
Mono<ServiceBusReceivedMessage> |
peek(String sessionId)
Reads the next active message without changing the state of the receiver or the message source.
|
Mono<ServiceBusReceivedMessage> |
peekAt(long sequenceNumber)
Starting from the given sequence number, reads next the active message without changing the state of the receiver
or the message source.
|
Mono<ServiceBusReceivedMessage> |
peekAt(long sequenceNumber,
String sessionId)
Starting from the given sequence number, reads next the active message without changing the state of the receiver
or the message source.
|
Flux<ServiceBusReceivedMessage> |
peekBatch(int maxMessages)
Reads the next batch of active messages without changing the state of the receiver or the message source.
|
Flux<ServiceBusReceivedMessage> |
peekBatch(int maxMessages,
String sessionId)
Reads the next batch of active messages without changing the state of the receiver or the message source.
|
Flux<ServiceBusReceivedMessage> |
peekBatchAt(int maxMessages,
long sequenceNumber)
Starting from the given sequence number, reads the next batch of active messages without changing the state of
the receiver or the message source.
|
Flux<ServiceBusReceivedMessage> |
peekBatchAt(int maxMessages,
long sequenceNumber,
String sessionId)
Starting from the given sequence number, reads the next batch of active messages without changing the state of
the receiver or the message source.
|
Flux<ServiceBusReceivedMessageContext> |
receive()
Receives a stream of
messages from the Service Bus entity and completes them
when they are finished processing. |
Flux<ServiceBusReceivedMessageContext> |
receive(ReceiveAsyncOptions options)
Receives a stream of
messages from the Service Bus entity with a set of
options. |
Mono<ServiceBusReceivedMessage> |
receiveDeferredMessage(long sequenceNumber)
Receives a deferred
message . |
Mono<ServiceBusReceivedMessage> |
receiveDeferredMessage(long sequenceNumber,
String sessionId)
Receives a deferred
message . |
Flux<ServiceBusReceivedMessage> |
receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers)
Receives a batch of deferred
messages . |
Flux<ServiceBusReceivedMessage> |
receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers,
String sessionId)
Receives a batch of deferred
messages . |
Mono<Instant> |
renewMessageLock(MessageLockToken lockToken)
Asynchronously renews the lock on the specified message.
|
Mono<Instant> |
renewSessionLock(String sessionId)
Sets the state of a session given its identifier.
|
Mono<Void> |
setSessionState(String sessionId,
byte[] sessionState)
Sets the state of a session given its identifier.
|
public String getFullyQualifiedNamespace()
{yournamespace}.servicebus.windows.net
.public String getEntityPath()
public Mono<Void> abandon(MessageLockToken lockToken)
message
with its lock token. This will make the message available
again for processing. Abandoning a message will increase the delivery count on the message.lockToken
- Lock token of the message.Mono
that completes when the Service Bus abandon operation completes.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> abandon(MessageLockToken lockToken, String sessionId)
message
with its lock token. This will make the message available
again for processing. Abandoning a message will increase the delivery count on the message.lockToken
- Lock token of the message.sessionId
- Session id of the message to abandon. null
if there is no session.Mono
that completes when the Service Bus abandon operation completes.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> abandon(MessageLockToken lockToken, Map<String,Object> propertiesToModify)
message
with its lock token and updates the message's properties.
This will make the message available again for processing. Abandoning a message will increase the delivery count
on the message.lockToken
- Lock token of the message.propertiesToModify
- Properties to modify on the message.Mono
that completes when the Service Bus operation finishes.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> abandon(MessageLockToken lockToken, Map<String,Object> propertiesToModify, String sessionId)
message
with its lock token and updates the message's properties.
This will make the message available again for processing. Abandoning a message will increase the delivery count
on the message.lockToken
- Lock token of the message.propertiesToModify
- Properties to modify on the message.sessionId
- Session id of the message to abandon. null
if there is no session.Mono
that completes when the Service Bus abandon operation completes.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> complete(MessageLockToken lockToken)
message
using its lock token. This will delete the message from the
service.lockToken
- Lock token of the message.Mono
that finishes when the message is completed on Service Bus.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> complete(MessageLockToken lockToken, String sessionId)
message
using its lock token. This will delete the message from the
service.lockToken
- Lock token of the message.sessionId
- Session id of the message to complete. null
if there is no session.Mono
that finishes when the message is completed on Service Bus.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> defer(MessageLockToken lockToken)
message
using its lock token. This will move message into the deferred
subqueue.lockToken
- Lock token of the message.Mono
that completes when the Service Bus defer operation finishes.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> defer(MessageLockToken lockToken, String sessionId)
message
using its lock token. This will move message into the deferred
subqueue.lockToken
- Lock token of the message.sessionId
- Session id of the message to defer. null
if there is no session.Mono
that completes when the defer operation finishes.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> defer(MessageLockToken lockToken, Map<String,Object> propertiesToModify)
message
using its lock token with modified message property. This will
move message into the deferred subqueue.lockToken
- Lock token of the message.propertiesToModify
- Message properties to modify.Mono
that completes when the defer operation finishes.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> defer(MessageLockToken lockToken, Map<String,Object> propertiesToModify, String sessionId)
message
using its lock token with modified message property. This will
move message into the deferred subqueue.lockToken
- Lock token of the message.propertiesToModify
- Message properties to modify.sessionId
- Session id of the message to defer. null
if there is no session.Mono
that completes when the Service Bus defer operation finishes.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> deadLetter(MessageLockToken lockToken)
message
to the deadletter sub-queue.lockToken
- Lock token of the message.Mono
that completes when the dead letter operation finishes.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> deadLetter(MessageLockToken lockToken, String sessionId)
message
to the deadletter sub-queue.lockToken
- Lock token of the message.sessionId
- Session id of the message to deadletter. null
if there is no session.Mono
that completes when the dead letter operation finishes.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> deadLetter(MessageLockToken lockToken, DeadLetterOptions deadLetterOptions)
message
to the deadletter subqueue with deadletter reason, error
description, and/or modified properties.lockToken
- Lock token of the message.deadLetterOptions
- The options to specify when moving message to the deadletter sub-queue.Mono
that completes when the dead letter operation finishes.NullPointerException
- if lockToken
or deadLetterOptions
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<Void> deadLetter(MessageLockToken lockToken, DeadLetterOptions deadLetterOptions, String sessionId)
message
to the deadletter subqueue with deadletter reason, error
description, and/or modified properties.lockToken
- Lock token of the message.deadLetterOptions
- The options to specify when moving message to the deadletter sub-queue.sessionId
- Session id of the message to deadletter. null
if there is no session.Mono
that completes when the dead letter operation finishes.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns a null lock token.public Mono<byte[]> getSessionState(String sessionId)
sessionId
- Identifier of session to get.IllegalStateException
- if the receiver is a non-session receiver.public Mono<ServiceBusReceivedMessage> peek()
peek()
fetches the first active message for this receiver. Each subsequent call fetches the subsequent
message in the entity.ServiceBusReceivedMessage
.public Mono<ServiceBusReceivedMessage> peek(String sessionId)
peek()
fetches the first active message for this receiver. Each subsequent call fetches the subsequent
message in the entity.sessionId
- Session id of the message to peek from. null
if there is no session.ServiceBusReceivedMessage
.public Mono<ServiceBusReceivedMessage> peekAt(long sequenceNumber)
sequenceNumber
- The sequence number from where to read the message.ServiceBusReceivedMessage
.public Mono<ServiceBusReceivedMessage> peekAt(long sequenceNumber, String sessionId)
sequenceNumber
- The sequence number from where to read the message.sessionId
- Session id of the message to peek from. null
if there is no session.ServiceBusReceivedMessage
.public Flux<ServiceBusReceivedMessage> peekBatch(int maxMessages)
maxMessages
- The number of messages.Flux
of messages
that are peeked.IllegalArgumentException
- if maxMessages
is not a positive integer.public Flux<ServiceBusReceivedMessage> peekBatch(int maxMessages, String sessionId)
maxMessages
- The number of messages.sessionId
- Session id of the messages to peek from. null
if there is no session.IterableStream
of messages
that are peeked.IllegalArgumentException
- if maxMessages
is not a positive integer.public Flux<ServiceBusReceivedMessage> peekBatchAt(int maxMessages, long sequenceNumber)
maxMessages
- The number of messages.sequenceNumber
- The sequence number from where to start reading messages.Flux
of ServiceBusReceivedMessage
peeked.IllegalArgumentException
- if maxMessages
is not a positive integer.public Flux<ServiceBusReceivedMessage> peekBatchAt(int maxMessages, long sequenceNumber, String sessionId)
maxMessages
- The number of messages.sequenceNumber
- The sequence number from where to start reading messages.sessionId
- Session id of the messages to peek from. null
if there is no session.IterableStream
of ServiceBusReceivedMessage
peeked.IllegalArgumentException
- if maxMessages
is not a positive integer.public Flux<ServiceBusReceivedMessageContext> receive()
messages
from the Service Bus entity and completes them
when they are finished processing.
By default, each successfully consumed message is auto-completed
and auto-renewed
. When downstream consumers throw an exception, the
auto-completion feature will abandon
the message. Auto-renewal
occurs until the operation timeout
has elapsed.
com.azure.core.amqp.exception.AmqpException
- if operation timeout
has elapsed and
downstream consumers are still processing the message.public Flux<ServiceBusReceivedMessageContext> receive(ReceiveAsyncOptions options)
messages
from the Service Bus entity with a set of
options. To disable lock auto-renewal, set setMaxAutoRenewDuration
to Duration.ZERO
or null
.options
- Set of options to set when receiving messages.NullPointerException
- if options
is null.IllegalArgumentException
- if max auto-renew
duration
is negative.public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber)
message
. Deferred messages can only be received by using
sequence number.sequenceNumber
- The sequence number
of the
message.sequenceNumber
.public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber, String sessionId)
message
. Deferred messages can only be received by using
sequence number.sequenceNumber
- The sequence number
of the
message.sessionId
- Session id of the deferred message. null
if there is no session.sequenceNumber
.public Flux<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers)
messages
. Deferred messages can only be received
by using sequence number.public Flux<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers, String sessionId)
messages
. Deferred messages can only be received
by using sequence number.sequenceNumbers
- The sequence numbers of the deferred messages.sessionId
- Session id of the deferred messages. null
if there is no session.IterableStream
of deferred messages
.public Mono<Instant> renewMessageLock(MessageLockToken lockToken)
ReceiveMode.PEEK_LOCK
mode, the message is locked on the
server for this receiver instance for a duration as specified during the Queue creation (LockDuration). If
processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the
lock is reset to the entity's LockDuration value.lockToken
- Lock token of the message to renew.NullPointerException
- if lockToken
is null.UnsupportedOperationException
- if the receiver was opened in ReceiveMode.RECEIVE_AND_DELETE
mode.IllegalStateException
- if the receiver is a session receiver.IllegalArgumentException
- if MessageLockToken.getLockToken()
returns an empty value.public Mono<Instant> renewSessionLock(String sessionId)
sessionId
- Identifier of session to get.IllegalStateException
- if the receiver is a non-session receiver.public Mono<Void> setSessionState(String sessionId, byte[] sessionState)
sessionId
- Identifier of session to get.sessionState
- State to set on the session.IllegalStateException
- if the receiver is a non-session receiver.public void close()
close
in interface AutoCloseable
Copyright © 2020 Microsoft Corporation. All rights reserved.