Class ServiceBusReceiverAsyncClient

java.lang.Object
com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient
All Implemented Interfaces:
AutoCloseable

public final class ServiceBusReceiverAsyncClient extends Object implements AutoCloseable
An asynchronous receiver responsible for receiving messages from a specific queue or topic subscription.

Create an instance of receiver

 // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
 // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
 // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"

 ServiceBusReceiverAsyncClient consumer = new ServiceBusClientBuilder()
     .connectionString(connectionString)
     .receiver()
     .queueName(queueName)
     .buildAsyncClient();
 

Create an instance of receiver using default credential

 // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
 ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
     .credential("<<fully-qualified-namespace>>",
         new DefaultAzureCredentialBuilder().build())
     .receiver()
     .queueName("<< QUEUE NAME >>")
     .buildAsyncClient();
 

Receive all messages from Service Bus resource

This returns an infinite stream of messages from Service Bus. The stream ends when the subscription is disposed or other terminal scenarios. See receiveMessages() for more information.

 Disposable subscription = receiver.receiveMessages()
     .subscribe(message -> {
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());
     },
         error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

 // When program ends, or you're done receiving all messages.
 subscription.dispose();
 receiver.close();
 

Receive messages in ServiceBusReceiveMode.RECEIVE_AND_DELETE mode from a Service Bus entity

 // Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 Disposable subscription = receiver.receiveMessages()
     .subscribe(message -> {
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody().toString());
     }, error -> System.err.print(error));
 

Receive messages from a specific session

To fetch messages from a specific session, switch to ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder and build the session receiver client. Use ServiceBusSessionReceiverAsyncClient.acceptSession(String) to create a session-bound ServiceBusReceiverAsyncClient.

 // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
 // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .connectionString(connectionString)
     .sessionReceiver()
     .queueName(queueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<< my-session-id >>" session is
 // successfully locked.
 // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes.
 // `Mono.usingWhen` can also be used if the resource closure only returns a single item.
 Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<< my-session-id >>"),
     receiver -> receiver.receiveMessages(),
     receiver -> Mono.fromRunnable(() -> receiver.close()));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     message -> System.out.printf("Received Sequence #: %s. Contents: %s%n",
         message.getSequenceNumber(), message.getBody()),
     error -> System.err.print(error));
 

Receive messages from the first available session

To process messages from the first available session, switch to ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder and build the session receiver client. Use acceptNextSession() to find the first available session to process messages from.

 // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
 // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .connectionString(connectionString)
     .sessionReceiver()
     .queueName(queueName)
     .buildAsyncClient();

 // acceptNextSession() completes successfully with a receiver when it acquires the next available session.
 // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes.
 // `Mono.usingWhen` can also be used if the resource closure only returns a single item.
 Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptNextSession(),
     receiver -> receiver.receiveMessages(),
     receiver -> Mono.fromRunnable(() -> receiver.close()));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     message -> System.out.printf("Received Sequence #: %s. Contents: %s%n",
         message.getSequenceNumber(), message.getBody()),
     error -> System.err.print(error));
 

Rate limiting consumption of messages from a Service Bus entity

For message receivers that need to limit the number of messages they receive at a given time, they can use BaseSubscriber.request(long).

 receiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
     private static final int NUMBER_OF_MESSAGES = 5;
     private final AtomicInteger currentNumberOfMessages = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 message at a time.
         request(NUMBER_OF_MESSAGES);
     }

     @Override
     protected void hookOnNext(ServiceBusReceivedMessage message) {
         // Process the ServiceBusReceivedMessage
         // If the number of messages we have currently received is a multiple of 5, that means we have reached
         // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
         // that the subscriber is ready to get more messages from upstream.
         if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_MESSAGES);
         }
     }
 });
 
See Also:
  • Method Details

    • getFullyQualifiedNamespace

      public String getFullyQualifiedNamespace()
      Gets the fully qualified Service Bus namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net.
      Returns:
      The fully qualified Service Bus namespace that the connection is associated with.
    • getEntityPath

      public String getEntityPath()
      Gets the Service Bus resource this client interacts with.
      Returns:
      The Service Bus resource this client interacts with.
    • getSessionId

      public String getSessionId()
      Gets the SessionId of the session if this receiver is a session receiver.
      Returns:
      The SessionId or null if this is not a session receiver.
    • getIdentifier

      public String getIdentifier()
      Gets the identifier of the instance of ServiceBusReceiverAsyncClient.
      Returns:
      The identifier that can identify the instance of ServiceBusReceiverAsyncClient.
    • abandon

      public Mono<Void> abandon(ServiceBusReceivedMessage message)
      Abandons a message. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.
      Parameters:
      message - The ServiceBusReceivedMessage to perform this operation.
      Returns:
      A Mono that completes when the Service Bus abandon operation completes.
      Throws:
      NullPointerException - if message is null.
      UnsupportedOperationException - if the receiver was opened in ServiceBusReceiveMode.RECEIVE_AND_DELETE mode or if the message was received from peekMessage.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if the message could not be abandoned.
      IllegalArgumentException - if the message has either been deleted or already settled.
    • abandon

      public Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options)
      Abandons a message updates the message's properties. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.
      Parameters:
      message - The ServiceBusReceivedMessage to perform this operation.
      options - The options to set while abandoning the message.
      Returns:
      A Mono that completes when the Service Bus operation finishes.
      Throws:
      NullPointerException - if message or options is null. Also if transactionContext.transactionId is null when options.transactionContext is specified.
      UnsupportedOperationException - if the receiver was opened in ServiceBusReceiveMode.RECEIVE_AND_DELETE mode or if the message was received from peekMessage.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if the message could not be abandoned.
      IllegalArgumentException - if the message has either been deleted or already settled.
    • complete

      public Mono<Void> complete(ServiceBusReceivedMessage message)
      Completes a message. This will delete the message from the service.
      Parameters:
      message - The ServiceBusReceivedMessage to perform this operation.
      Returns:
      A Mono that finishes when the message is completed on Service Bus.
      Throws:
      NullPointerException - if message is null.
      UnsupportedOperationException - if the receiver was opened in ServiceBusReceiveMode.RECEIVE_AND_DELETE mode or if the message was received from peekMessage.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if the message could not be completed.
      IllegalArgumentException - if the message has either been deleted or already settled.
    • complete

      public Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options)
      Completes a message with the given options. This will delete the message from the service.
      Parameters:
      message - The ServiceBusReceivedMessage to perform this operation.
      options - Options used to complete the message.
      Returns:
      A Mono that finishes when the message is completed on Service Bus.
      Throws:
      NullPointerException - if message or options is null. Also if transactionContext.transactionId is null when options.transactionContext is specified.
      UnsupportedOperationException - if the receiver was opened in ServiceBusReceiveMode.RECEIVE_AND_DELETE mode or if the message was received from peekMessage.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if the message could not be completed.
      IllegalArgumentException - if the message has either been deleted or already settled.
    • defer

      public Mono<Void> defer(ServiceBusReceivedMessage message)
      Defers a message. This will move message into the deferred sub-queue.
      Parameters:
      message - The ServiceBusReceivedMessage to perform this operation.
      Returns:
      A Mono that completes when the Service Bus defer operation finishes.
      Throws:
      NullPointerException - if message is null.
      UnsupportedOperationException - if the receiver was opened in ServiceBusReceiveMode.RECEIVE_AND_DELETE mode or if the message was received from peekMessage.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if the message could not be deferred.
      IllegalArgumentException - if the message has either been deleted or already settled.
      See Also:
    • defer

      public Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options)
      Defers a message with the options set. This will move message into the deferred sub-queue.
      Parameters:
      message - The ServiceBusReceivedMessage to perform this operation.
      options - Options used to defer the message.
      Returns:
      A Mono that completes when the defer operation finishes.
      Throws:
      NullPointerException - if message or options is null. Also if transactionContext.transactionId is null when options.transactionContext is specified.
      UnsupportedOperationException - if the receiver was opened in ServiceBusReceiveMode.RECEIVE_AND_DELETE mode or if the message was received from peekMessage.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if the message could not be deferred.
      IllegalArgumentException - if the message has either been deleted or already settled.
      See Also:
    • deadLetter

      public Mono<Void> deadLetter(ServiceBusReceivedMessage message)
      Moves a message to the dead-letter sub-queue.
      Parameters:
      message - The ServiceBusReceivedMessage to perform this operation.
      Returns:
      A Mono that completes when the dead letter operation finishes.
      Throws:
      NullPointerException - if message is null.
      UnsupportedOperationException - if the receiver was opened in ServiceBusReceiveMode.RECEIVE_AND_DELETE mode or if the message was received from peekMessage.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if the message could not be dead-lettered.
      IllegalArgumentException - if the message has either been deleted or already settled.
      See Also:
    • deadLetter

      public Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)
      Moves a message to the dead-letter sub-queue with the given options.
      Parameters:
      message - The ServiceBusReceivedMessage to perform this operation.
      options - Options used to dead-letter the message.
      Returns:
      A Mono that completes when the dead letter operation finishes.
      Throws:
      NullPointerException - if message or options is null. Also if transactionContext.transactionId is null when options.transactionContext is specified.
      UnsupportedOperationException - if the receiver was opened in ServiceBusReceiveMode.RECEIVE_AND_DELETE mode or if the message was received from peekMessage.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if the message could not be dead-lettered.
      IllegalArgumentException - if the message has either been deleted or already settled.
      See Also:
    • getSessionState

      public Mono<byte[]> getSessionState()
      Gets the state of the session if this receiver is a session receiver.
      Returns:
      The session state or an empty Mono if there is no state set for the session.
      Throws:
      IllegalStateException - if the receiver is a non-session receiver or receiver is already closed.
      ServiceBusException - if the session state could not be acquired.
    • peekMessage

      public Mono<ServiceBusReceivedMessage> peekMessage()
      Reads the next active message without changing the state of the receiver or the message source. The first call to peek() fetches the first active message for this receiver. Each subsequent call fetches the subsequent message in the entity.
      Returns:
      A peeked ServiceBusReceivedMessage.
      Throws:
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if an error occurs while peeking at the message.
      See Also:
    • peekMessage

      public Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber)
      Starting from the given sequence number, reads next the active message without changing the state of the receiver or the message source.
      Parameters:
      sequenceNumber - The sequence number from where to read the message.
      Returns:
      A peeked ServiceBusReceivedMessage.
      Throws:
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if an error occurs while peeking at the message.
      See Also:
    • peekMessages

      public Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages)
      Reads the next batch of active messages without changing the state of the receiver or the message source.
      Parameters:
      maxMessages - The number of messages.
      Returns:
      A Flux of messages that are peeked.
      Throws:
      IllegalArgumentException - if maxMessages is not a positive integer.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if an error occurs while peeking at messages.
      See Also:
    • peekMessages

      public Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)
      Starting from the given sequence number, reads the next batch of active messages without changing the state of the receiver or the message source.
      Parameters:
      maxMessages - The number of messages.
      sequenceNumber - The sequence number from where to start reading messages.
      Returns:
      A Flux of messages peeked.
      Throws:
      IllegalArgumentException - if maxMessages is not a positive integer.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if an error occurs while peeking at messages.
      See Also:
    • receiveMessages

      public Flux<ServiceBusReceivedMessage> receiveMessages()
      Receives an infinite stream of messages from the Service Bus entity. This Flux continuously receives messages from a Service Bus entity until either:
      • The receiver is closed.
      • The subscription to the Flux is disposed.
      • A terminal signal from a downstream subscriber is propagated upstream (ie. Flux.take(long) or Flux.take(Duration)).
      • An AmqpException occurs that causes the receive link to stop.
      Returns:
      An infinite stream of messages from the Service Bus entity.
      Throws:
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if an error occurs while receiving messages.
    • receiveDeferredMessage

      public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber)
      Receives a deferred message. Deferred messages can only be received by using sequence number.
      Parameters:
      sequenceNumber - The sequence number of the message.
      Returns:
      A deferred message with the matching sequenceNumber.
      Throws:
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if deferred message cannot be received.
    • receiveDeferredMessages

      public Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers)
      Receives a batch of deferred messages. Deferred messages can only be received by using sequence number.
      Parameters:
      sequenceNumbers - The sequence numbers of the deferred messages.
      Returns:
      A Flux of deferred messages.
      Throws:
      NullPointerException - if sequenceNumbers is null.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if deferred messages cannot be received.
    • renewMessageLock

      public Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)
      Asynchronously renews the lock on the message. The lock will be renewed based on the setting specified on the entity. When a message is received in ServiceBusReceiveMode.PEEK_LOCK mode, the message is locked on the server for this receiver instance for a duration as specified during the entity creation (LockDuration). If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is reset to the entity's LockDuration value.
      Parameters:
      message - The ServiceBusReceivedMessage to perform auto-lock renewal.
      Returns:
      The new expiration time for the message.
      Throws:
      NullPointerException - if message or message.getLockToken() is null.
      UnsupportedOperationException - if the receiver was opened in ServiceBusReceiveMode.RECEIVE_AND_DELETE mode or if the message was received from peekMessage.
      IllegalStateException - if the receiver is a session receiver or receiver is already disposed.
      IllegalArgumentException - if message.getLockToken() is an empty value.
    • renewMessageLock

      public Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)
      Starts the auto lock renewal for a message.
      Parameters:
      message - The ServiceBusReceivedMessage to perform this operation.
      maxLockRenewalDuration - Maximum duration to keep renewing the lock token.
      Returns:
      A Mono that completes when the message renewal operation has completed up until maxLockRenewalDuration.
      Throws:
      NullPointerException - if message, message.getLockToken(), or maxLockRenewalDuration is null.
      IllegalStateException - if the receiver is a session receiver or the receiver is disposed.
      IllegalArgumentException - if message.getLockToken() is an empty value.
      ServiceBusException - If the message lock cannot be renewed.
    • renewSessionLock

      public Mono<OffsetDateTime> renewSessionLock()
      Renews the session lock if this receiver is a session receiver.
      Returns:
      The next expiration time for the session lock.
      Throws:
      IllegalStateException - if the receiver is a non-session receiver or if receiver is already disposed.
      ServiceBusException - if the session lock cannot be renewed.
    • renewSessionLock

      public Mono<Void> renewSessionLock(Duration maxLockRenewalDuration)
      Starts the auto lock renewal for the session this receiver works for.
      Parameters:
      maxLockRenewalDuration - Maximum duration to keep renewing the session lock.
      Returns:
      A lock renewal operation for the message.
      Throws:
      NullPointerException - if sessionId or maxLockRenewalDuration is null.
      IllegalStateException - if the receiver is a non-session receiver or the receiver is disposed.
      ServiceBusException - if the session lock renewal operation cannot be started.
      IllegalArgumentException - if sessionId is an empty string or maxLockRenewalDuration is negative.
    • setSessionState

      public Mono<Void> setSessionState(byte[] sessionState)
      Sets the state of the session this receiver works for.
      Parameters:
      sessionState - State to set on the session.
      Returns:
      A Mono that completes when the session is set
      Throws:
      IllegalStateException - if the receiver is a non-session receiver or receiver is already disposed.
      ServiceBusException - if the session state cannot be set.
    • createTransaction

      public Mono<ServiceBusTransactionContext> createTransaction()
      Starts a new service side transaction. The transaction context should be passed to all operations that needs to be in this transaction.

      Creating and using a transaction

       // This mono creates a transaction and caches the output value, so we can associate operations with the
       // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
       // the operation.
       Mono<ServiceBusTransactionContext> transactionContext = receiver.createTransaction()
           .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
               error -> Duration.ZERO,
               () -> Duration.ZERO);
      
       transactionContext.flatMap(transaction -> {
           // Process messages and associate operations with the transaction.
           Mono<Void> operations = Mono.when(
               receiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
                   receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
               receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
      
           // Finally, either commit or rollback the transaction once all the operations are associated with it.
           return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction));
       });
       
      Returns:
      The Mono that finishes this operation on service bus resource.
      Throws:
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if a transaction cannot be created.
    • commitTransaction

      public Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)
      Commits the transaction and all the operations associated with it.

      Creating and using a transaction

       // This mono creates a transaction and caches the output value, so we can associate operations with the
       // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
       // the operation.
       Mono<ServiceBusTransactionContext> transactionContext = receiver.createTransaction()
           .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
               error -> Duration.ZERO,
               () -> Duration.ZERO);
      
       transactionContext.flatMap(transaction -> {
           // Process messages and associate operations with the transaction.
           Mono<Void> operations = Mono.when(
               receiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
                   receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
               receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
      
           // Finally, either commit or rollback the transaction once all the operations are associated with it.
           return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction));
       });
       
      Parameters:
      transactionContext - The transaction to be commit.
      Returns:
      The Mono that finishes this operation on service bus resource.
      Throws:
      NullPointerException - if transactionContext or transactionContext.transactionId is null.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if the transaction could not be committed.
    • rollbackTransaction

      public Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)
      Rollbacks the transaction given and all operations associated with it.

      Creating and using a transaction

       // This mono creates a transaction and caches the output value, so we can associate operations with the
       // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
       // the operation.
       Mono<ServiceBusTransactionContext> transactionContext = receiver.createTransaction()
           .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
               error -> Duration.ZERO,
               () -> Duration.ZERO);
      
       transactionContext.flatMap(transaction -> {
           // Process messages and associate operations with the transaction.
           Mono<Void> operations = Mono.when(
               receiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
                   receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
               receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
      
           // Finally, either commit or rollback the transaction once all the operations are associated with it.
           return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction));
       });
       
      Parameters:
      transactionContext - The transaction to rollback.
      Returns:
      The Mono that finishes this operation on service bus resource.
      Throws:
      NullPointerException - if transactionContext or transactionContext.transactionId is null.
      IllegalStateException - if receiver is already disposed.
      ServiceBusException - if the transaction could not be rolled back.
    • close

      public void close()
      Disposes of the consumer by closing the underlying links to the service.
      Specified by:
      close in interface AutoCloseable