public final class ServiceBusSessionReceiverAsyncClient extends Object implements AutoCloseable
ServiceBusReceiverAsyncClient
instances that are tied to the locked sessions.
Receive messages from a specific session
Use ServiceBusSessionReceiverAsyncClient.acceptSession(String)
to acquire the lock of a session if you know the session id.
// The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() .queueName(queueName) .buildAsyncClient(); // acceptSession(String) completes successfully with a receiver when "<< my-session-id >>" session is // successfully locked. // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes. // `Mono.usingWhen` can also be used if the resource closure only returns a single item. Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen( sessionReceiver.acceptSession("<< my-session-id >>"), receiver -> receiver.receiveMessages(), receiver -> Mono.fromRunnable(() -> receiver.close())); // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code // is non-blocking and kicks off the operation. Disposable subscription = sessionMessages.subscribe( message -> System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody()), error -> System.err.print(error));
Receive messages from the first available session
Use ServiceBusSessionReceiverAsyncClient.acceptNextSession()
to acquire the lock of the next available session without specifying the session
id.
// The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() .queueName(queueName) .buildAsyncClient(); // acceptNextSession() completes successfully with a receiver when it acquires the next available session. // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes. // `Mono.usingWhen` can also be used if the resource closure only returns a single item. Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen( sessionReceiver.acceptNextSession(), receiver -> receiver.receiveMessages(), receiver -> Mono.fromRunnable(() -> receiver.close())); // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code // is non-blocking and kicks off the operation. Disposable subscription = sessionMessages.subscribe( message -> System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody()), error -> System.err.print(error));
Modifier and Type | Method and Description |
---|---|
Mono<ServiceBusReceiverAsyncClient> |
acceptNextSession()
Acquires a session lock for the next available session and creates a
ServiceBusReceiverAsyncClient
to receive messages from the session. |
Mono<ServiceBusReceiverAsyncClient> |
acceptSession(String sessionId)
Acquires a session lock for
sessionId and create a ServiceBusReceiverAsyncClient
to receive messages from the session. |
void |
close() |
public Mono<ServiceBusReceiverAsyncClient> acceptNextSession()
ServiceBusReceiverAsyncClient
to receive messages from the session. It will wait until a session is available if none is immediately
available.ServiceBusReceiverAsyncClient
that is tied to the available session.UnsupportedOperationException
- if the queue or topic subscription is not session-enabled.AmqpException
- if the operation times out. The timeout duration is the tryTimeout
of when you build this client with the ServiceBusClientBuilder.retryOptions(AmqpRetryOptions)
.public Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId)
sessionId
and create a ServiceBusReceiverAsyncClient
to receive messages from the session. If the session is already locked by another client, an
AmqpException
is thrown.sessionId
- The session id.ServiceBusReceiverAsyncClient
that is tied to the specified session.NullPointerException
- if sessionId
is null.IllegalArgumentException
- if sessionId
is empty.UnsupportedOperationException
- if the queue or topic subscription is not session-enabled.AmqpException
- if the lock cannot be acquired.public void close()
close
in interface AutoCloseable
Copyright © 2021 Microsoft Corporation. All rights reserved.