Package com.azure.messaging.servicebus
Class ServiceBusSessionReceiverAsyncClient
java.lang.Object
com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient
- All Implemented Interfaces:
AutoCloseable
This asynchronous session receiver client is used to acquire session locks from a queue or topic and create
ServiceBusReceiverAsyncClient
instances that are tied to the locked sessions.
Receive messages from a specific session
Use acceptSession(String)
to acquire the lock of a session if you know the session id.
// The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() .queueName(queueName) .buildAsyncClient(); // acceptSession(String) completes successfully with a receiver when "<< my-session-id >>" session is // successfully locked. // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes. // `Mono.usingWhen` can also be used if the resource closure only returns a single item. Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen( sessionReceiver.acceptSession("<< my-session-id >>"), receiver -> receiver.receiveMessages(), receiver -> Mono.fromRunnable(() -> receiver.close())); // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code // is non-blocking and kicks off the operation. Disposable subscription = sessionMessages.subscribe( message -> System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody()), error -> System.err.print(error));
Receive messages from the first available session
Use acceptNextSession()
to acquire the lock of the next available session without specifying the session
id.
// The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() .queueName(queueName) .buildAsyncClient(); // acceptNextSession() completes successfully with a receiver when it acquires the next available session. // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes. // `Mono.usingWhen` can also be used if the resource closure only returns a single item. Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen( sessionReceiver.acceptNextSession(), receiver -> receiver.receiveMessages(), receiver -> Mono.fromRunnable(() -> receiver.close())); // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code // is non-blocking and kicks off the operation. Disposable subscription = sessionMessages.subscribe( message -> System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody()), error -> System.err.print(error));
-
Method Summary
Modifier and TypeMethodDescriptionAcquires a session lock for the next available session and creates aServiceBusReceiverAsyncClient
to receive messages from the session.acceptSession
(String sessionId) Acquires a session lock forsessionId
and create aServiceBusReceiverAsyncClient
to receive messages from the session.void
close()
-
Method Details
-
acceptNextSession
Acquires a session lock for the next available session and creates aServiceBusReceiverAsyncClient
to receive messages from the session. It will wait until a session is available if none is immediately available.- Returns:
- A
ServiceBusReceiverAsyncClient
that is tied to the available session. - Throws:
UnsupportedOperationException
- if the queue or topic subscription is not session-enabled.com.azure.core.amqp.exception.AmqpException
- if the operation times out. The timeout duration is the tryTimeout of when you build this client with theServiceBusClientBuilder.retryOptions(AmqpRetryOptions)
.
-
acceptSession
Acquires a session lock forsessionId
and create aServiceBusReceiverAsyncClient
to receive messages from the session. If the session is already locked by another client, anAmqpException
is thrown.- Parameters:
sessionId
- The session id.- Returns:
- A
ServiceBusReceiverAsyncClient
that is tied to the specified session. - Throws:
NullPointerException
- ifsessionId
is null.IllegalArgumentException
- ifsessionId
is empty.UnsupportedOperationException
- if the queue or topic subscription is not session-enabled.com.azure.core.amqp.exception.AmqpException
- if the lock cannot be acquired.
-
close
public void close()- Specified by:
close
in interfaceAutoCloseable
-