public final class ServiceBusSenderAsyncClient extends Object implements AutoCloseable
Create an instance of sender
// The required parameter is a way to authenticate with Service Bus using credentials. // The connectionString provides a way to authenticate with Service Bus. ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder() .connectionString( "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}") .sender() .queueName("<< QUEUE NAME >>") .buildAsyncClient();
Create an instance of sender using default credential
// The required parameter is a way to authenticate with Service Bus using credentials. // The connectionString provides a way to authenticate with Service Bus. ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder() .credential("<<fully-qualified-namespace>>", new DefaultAzureCredentialBuilder().build()) .sender() .queueName("<< QUEUE NAME >>") .buildAsyncClient();
Send messages to a Service Bus resource
// The required parameter is a way to authenticate with Service Bus using credentials. // The connectionString provides a way to authenticate with Service Bus. ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder() .connectionString( "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}") .sender() .queueName("<QUEUE OR TOPIC NAME>") .buildAsyncClient(); // Creating a batch without options set, will allow for automatic routing of events to any partition. sender.createBatch().flatMap(batch -> { batch.tryAdd(new ServiceBusMessage("test-1".getBytes(UTF_8))); batch.tryAdd(new ServiceBusMessage("test-2".getBytes(UTF_8))); return sender.send(batch); }).subscribe(unused -> { }, error -> System.err.println("Error occurred while sending batch:" + error), () -> System.out.println("Send complete."));
Send messages using a size-limited ServiceBusMessageBatch
to a Service Bus resource
final Flux<ServiceBusMessage> telemetryMessages = Flux.just(firstMessage, secondMessage); // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch. // In this case, all the batches created with these options are limited to 256 bytes. final CreateBatchOptions options = new CreateBatchOptions() .setMaximumSizeInBytes(256); final AtomicReference<ServiceBusMessageBatch> currentBatch = new AtomicReference<>( sender.createBatch(options).block()); // The sample Flux contains two messages, but it could be an infinite stream of telemetry messages. telemetryMessages.flatMap(message -> { final ServiceBusMessageBatch batch = currentBatch.get(); if (batch.tryAdd(message)) { return Mono.empty(); } return Mono.when( sender.send(batch), sender.createBatch(options).map(newBatch -> { currentBatch.set(newBatch); // Add the message that did not fit in the previous batch. if (!newBatch.tryAdd(message)) { throw Exceptions.propagate(new IllegalArgumentException( "Message was too large to fit in an empty batch. Max size: " + newBatch.getMaxSizeInBytes())); } return newBatch; })); }).then() .doFinally(signal -> { final ServiceBusMessageBatch batch = currentBatch.getAndSet(null); if (batch != null && batch.getCount() > 0) { sender.send(batch).block(); } });
Modifier and Type | Method and Description |
---|---|
Mono<Void> |
cancelScheduledMessage(long sequenceNumber)
Cancels the enqueuing of an already scheduled message, if it was not already enqueued.
|
void |
close()
Disposes of the
ServiceBusSenderAsyncClient . |
Mono<ServiceBusMessageBatch> |
createBatch()
Creates a
ServiceBusMessageBatch that can fit as many messages as the transport allows. |
Mono<ServiceBusMessageBatch> |
createBatch(CreateBatchOptions options)
Creates an
ServiceBusMessageBatch configured with the options specified. |
String |
getEntityPath()
Gets the name of the Service Bus resource.
|
String |
getFullyQualifiedNamespace()
Gets the fully qualified namespace.
|
Mono<Long> |
scheduleMessage(ServiceBusMessage message,
Instant scheduledEnqueueTime)
Sends a scheduled message to the Azure Service Bus entity this sender is connected to.
|
Mono<Void> |
send(Iterable<ServiceBusMessage> messages)
Sends a set of messages to a Service Bus queue or topic using a batched approach.
|
Mono<Void> |
send(ServiceBusMessage message)
Sends a message to a Service Bus queue or topic.
|
Mono<Void> |
send(ServiceBusMessageBatch batch)
Sends a message batch to the Azure Service Bus entity this sender is connected to.
|
public String getFullyQualifiedNamespace()
public String getEntityPath()
public Mono<Void> send(ServiceBusMessage message)
message
- Message to be sent to Service Bus queue or topic.Mono
the finishes this operation on service bus resource.NullPointerException
- if message
is null
.public Mono<Void> send(Iterable<ServiceBusMessage> messages)
messages
- Messages to be sent to Service Bus queue or topic.Mono
that completes when all messages have been sent to the Service Bus resource.NullPointerException
- if messages
is null
.com.azure.core.amqp.exception.AmqpException
- if messages
is larger than the maximum allowed size of a single batch.public Mono<ServiceBusMessageBatch> createBatch()
ServiceBusMessageBatch
that can fit as many messages as the transport allows.ServiceBusMessageBatch
that can fit as many messages as the transport allows.public Mono<ServiceBusMessageBatch> createBatch(CreateBatchOptions options)
ServiceBusMessageBatch
configured with the options specified.options
- A set of options used to configure the ServiceBusMessageBatch
.ServiceBusMessageBatch
configured with the given options.NullPointerException
- if options
is null.public Mono<Void> send(ServiceBusMessageBatch batch)
batch
- of messages which allows client to send maximum allowed size for a batch of messages.Mono
the finishes this operation on service bus resource.NullPointerException
- if batch
is null
.public Mono<Long> scheduleMessage(ServiceBusMessage message, Instant scheduledEnqueueTime)
message
- Message to be sent to the Service Bus Queue.scheduledEnqueueTime
- Instant at which the message should appear in the Service Bus queue or topic.NullPointerException
- if message
or scheduledEnqueueTime
is null
.public Mono<Void> cancelScheduledMessage(long sequenceNumber)
sequenceNumber
- of the scheduled message to cancel.Mono
that finishes this operation on service bus resource.IllegalArgumentException
- if sequenceNumber
is negative.public void close()
ServiceBusSenderAsyncClient
. If the client had a dedicated connection, the underlying
connection is also closed.close
in interface AutoCloseable
Copyright © 2020 Microsoft Corporation. All rights reserved.