Class ServiceBusSenderAsyncClient

java.lang.Object
com.azure.messaging.servicebus.ServiceBusSenderAsyncClient
All Implemented Interfaces:
AutoCloseable

public final class ServiceBusSenderAsyncClient extends Object implements AutoCloseable
An asynchronous client to send messages to a Service Bus resource.

Create an instance of sender

 // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
 // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
 // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
 ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
     .connectionString(connectionString)
     .sender()
     .queueName(queueName)
     .buildAsyncClient();
 

Create an instance of sender using default credential

 // The required parameter is a way to authenticate with Service Bus using credentials.
 // The connectionString provides a way to authenticate with Service Bus.
 ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
     .credential("<<fully-qualified-namespace>>",
         new DefaultAzureCredentialBuilder().build())
     .sender()
     .queueName("<< QUEUE NAME >>")
     .buildAsyncClient();
 

Send messages to a Service Bus resource

 // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
 // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
 // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
 ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
     .connectionString(connectionString)
     .sender()
     .queueName(queueName)
     .buildAsyncClient();

 // Creating a batch without options set, will allow for automatic routing of events to any partition.
 sender.createMessageBatch().flatMap(batch -> {
     batch.tryAddMessage(new ServiceBusMessage(BinaryData.fromBytes("test-1".getBytes(UTF_8))));
     batch.tryAddMessage(new ServiceBusMessage(BinaryData.fromBytes("test-2".getBytes(UTF_8))));
     return sender.sendMessages(batch);
 }).subscribe(unused -> {
 },
     error -> System.err.println("Error occurred while sending batch:" + error),
     () -> System.out.println("Send complete."));
 

Send messages using a size-limited ServiceBusMessageBatch to a Service Bus resource

 Flux<ServiceBusMessage> telemetryMessages = Flux.just(firstMessage, secondMessage);

 // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
 // In this case, all the batches created with these options are limited to 256 bytes.
 CreateMessageBatchOptions options = new CreateMessageBatchOptions()
     .setMaximumSizeInBytes(256);
 AtomicReference<ServiceBusMessageBatch> currentBatch = new AtomicReference<>(
     sender.createMessageBatch(options).block());

 // The sample Flux contains two messages, but it could be an infinite stream of telemetry messages.
 telemetryMessages.flatMap(message -> {
     ServiceBusMessageBatch batch = currentBatch.get();
     if (batch.tryAddMessage(message)) {
         return Mono.empty();
     }

     return Mono.when(
         sender.sendMessages(batch),
         sender.createMessageBatch(options).map(newBatch -> {
             currentBatch.set(newBatch);

             // Add the message that did not fit in the previous batch.
             if (!newBatch.tryAddMessage(message)) {
                 throw Exceptions.propagate(new IllegalArgumentException(
                     "Message was too large to fit in an empty batch. Max size: " + newBatch.getMaxSizeInBytes()));
             }

             return newBatch;
         }));
 }).then()
     .doFinally(signal -> {
         ServiceBusMessageBatch batch = currentBatch.getAndSet(null);
         if (batch != null && batch.getCount() > 0) {
             sender.sendMessages(batch).block();
         }
     });