Package com.azure.messaging.servicebus
Class ServiceBusSenderAsyncClient
java.lang.Object
com.azure.messaging.servicebus.ServiceBusSenderAsyncClient
- All Implemented Interfaces:
AutoCloseable
An asynchronous client to send messages to a Service Bus resource.
Create an instance of sender
// The required parameters is connectionString, a way to authenticate with Service Bus using credentials. // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .queueName(queueName) .buildAsyncClient();
Create an instance of sender using default credential
// The required parameter is a way to authenticate with Service Bus using credentials. // The connectionString provides a way to authenticate with Service Bus. ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder() .credential("<<fully-qualified-namespace>>", new DefaultAzureCredentialBuilder().build()) .sender() .queueName("<< QUEUE NAME >>") .buildAsyncClient();
Send messages to a Service Bus resource
// The required parameters is connectionString, a way to authenticate with Service Bus using credentials. // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below. // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .queueName(queueName) .buildAsyncClient(); // Creating a batch without options set, will allow for automatic routing of events to any partition. sender.createMessageBatch().flatMap(batch -> { batch.tryAddMessage(new ServiceBusMessage(BinaryData.fromBytes("test-1".getBytes(UTF_8)))); batch.tryAddMessage(new ServiceBusMessage(BinaryData.fromBytes("test-2".getBytes(UTF_8)))); return sender.sendMessages(batch); }).subscribe(unused -> { }, error -> System.err.println("Error occurred while sending batch:" + error), () -> System.out.println("Send complete."));
Send messages using a size-limited ServiceBusMessageBatch
to a Service Bus resource
Flux<ServiceBusMessage> telemetryMessages = Flux.just(firstMessage, secondMessage); // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch. // In this case, all the batches created with these options are limited to 256 bytes. CreateMessageBatchOptions options = new CreateMessageBatchOptions() .setMaximumSizeInBytes(256); AtomicReference<ServiceBusMessageBatch> currentBatch = new AtomicReference<>( sender.createMessageBatch(options).block()); // The sample Flux contains two messages, but it could be an infinite stream of telemetry messages. telemetryMessages.flatMap(message -> { ServiceBusMessageBatch batch = currentBatch.get(); if (batch.tryAddMessage(message)) { return Mono.empty(); } return Mono.when( sender.sendMessages(batch), sender.createMessageBatch(options).map(newBatch -> { currentBatch.set(newBatch); // Add the message that did not fit in the previous batch. if (!newBatch.tryAddMessage(message)) { throw Exceptions.propagate(new IllegalArgumentException( "Message was too large to fit in an empty batch. Max size: " + newBatch.getMaxSizeInBytes())); } return newBatch; })); }).then() .doFinally(signal -> { ServiceBusMessageBatch batch = currentBatch.getAndSet(null); if (batch != null && batch.getCount() > 0) { sender.sendMessages(batch).block(); } });
-
Method Summary
Modifier and TypeMethodDescriptioncancelScheduledMessage
(long sequenceNumber) Cancels the enqueuing of a scheduled message, if it was not already enqueued.cancelScheduledMessages
(Iterable<Long> sequenceNumbers) Cancels the enqueuing of an already scheduled message, if it was not already enqueued.void
close()
Disposes of theServiceBusSenderAsyncClient
.commitTransaction
(ServiceBusTransactionContext transactionContext) Commits the transaction givenServiceBusTransactionContext
.Creates aServiceBusMessageBatch
that can fit as many messages as the transport allows.Creates anServiceBusMessageBatch
configured with the options specified.Starts a new transaction on Service Bus.Gets the name of the Service Bus resource.Gets the fully qualified namespace.Gets the identifier of the instance ofServiceBusSenderAsyncClient
.rollbackTransaction
(ServiceBusTransactionContext transactionContext) Rollbacks the transaction givenServiceBusTransactionContext
.scheduleMessage
(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime) Sends a scheduled message to the Azure Service Bus entity this sender is connected to.scheduleMessage
(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext) Sends a scheduled message to the Azure Service Bus entity this sender is connected to.scheduleMessages
(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime) Sends a batch of scheduled messages to the Azure Service Bus entity this sender is connected to.scheduleMessages
(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext) Sends a scheduled messages to the Azure Service Bus entity this sender is connected to.sendMessage
(ServiceBusMessage message) Sends a message to a Service Bus queue or topic.sendMessage
(ServiceBusMessage message, ServiceBusTransactionContext transactionContext) Sends a message to a Service Bus queue or topic.Sends a message batch to the Azure Service Bus entity this sender is connected to.sendMessages
(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext) Sends a message batch to the Azure Service Bus entity this sender is connected to.sendMessages
(Iterable<ServiceBusMessage> messages) Sends a set of messages to a Service Bus queue or topic using a batched approach.sendMessages
(Iterable<ServiceBusMessage> messages, ServiceBusTransactionContext transactionContext) Sends a set of messages to a Service Bus queue or topic using a batched approach.
-
Method Details
-
getFullyQualifiedNamespace
Gets the fully qualified namespace.- Returns:
- The fully qualified namespace.
-
getEntityPath
Gets the name of the Service Bus resource.- Returns:
- The name of the Service Bus resource.
-
getIdentifier
Gets the identifier of the instance ofServiceBusSenderAsyncClient
.- Returns:
- The identifier that can identify the instance of
ServiceBusSenderAsyncClient
.
-
sendMessage
Sends a message to a Service Bus queue or topic.- Parameters:
message
- Message to be sent to Service Bus queue or topic.- Returns:
- The
Mono
the finishes this operation on service bus resource. - Throws:
NullPointerException
- ifmessage
isnull
.IllegalStateException
- if sender is already disposed.ServiceBusException
- ifmessage
is larger than the maximum allowed size of a single message or the message could not be sent.
-
sendMessage
public Mono<Void> sendMessage(ServiceBusMessage message, ServiceBusTransactionContext transactionContext) Sends a message to a Service Bus queue or topic.- Parameters:
message
- Message to be sent to Service Bus queue or topic.transactionContext
- to be set on batch message before sending to Service Bus.- Returns:
- The
Mono
the finishes this operation on service bus resource. - Throws:
NullPointerException
- ifmessage
,transactionContext
ortransactionContext.transactionId
isnull
.IllegalStateException
- if sender is already disposed.ServiceBusException
- ifmessage
is larger than the maximum allowed size of a single message or the message could not be sent.
-
sendMessages
public Mono<Void> sendMessages(Iterable<ServiceBusMessage> messages, ServiceBusTransactionContext transactionContext) Sends a set of messages to a Service Bus queue or topic using a batched approach. If the size of messages exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.- Parameters:
messages
- Messages to be sent to Service Bus queue or topic.transactionContext
- to be set on batch message before sending to Service Bus.- Returns:
- A
Mono
that completes when all messages have been sent to the Service Bus resource. - Throws:
NullPointerException
- ifbatch
,transactionContext
ortransactionContext.transactionId
isnull
.IllegalStateException
- if sender is already disposed.ServiceBusException
- if the message could not be sent ormessage
is larger than the maximum size of theServiceBusMessageBatch
.
-
sendMessages
Sends a set of messages to a Service Bus queue or topic using a batched approach. If the size of messages exceed the maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message size is the max amount allowed on the link.- Parameters:
messages
- Messages to be sent to Service Bus queue or topic.- Returns:
- A
Mono
that completes when all messages have been sent to the Service Bus resource. - Throws:
NullPointerException
- ifmessages
isnull
.ServiceBusException
- if the message could not be sent ormessage
is larger than the maximum size of theServiceBusMessageBatch
.IllegalStateException
- if sender is already disposed.
-
sendMessages
Sends a message batch to the Azure Service Bus entity this sender is connected to.- Parameters:
batch
- of messages which allows client to send maximum allowed size for a batch of messages.- Returns:
- A
Mono
the finishes this operation on service bus resource. - Throws:
NullPointerException
- ifbatch
isnull
.ServiceBusException
- if the message batch could not be sent.IllegalStateException
- if sender is already disposed.
-
sendMessages
public Mono<Void> sendMessages(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext) Sends a message batch to the Azure Service Bus entity this sender is connected to.- Parameters:
batch
- of messages which allows client to send maximum allowed size for a batch of messages.transactionContext
- to be set on batch message before sending to Service Bus.- Returns:
- A
Mono
the finishes this operation on service bus resource. - Throws:
NullPointerException
- ifbatch
,transactionContext
ortransactionContext.transactionId
isnull
.ServiceBusException
- if the message batch could not be sent.IllegalStateException
- if sender is already disposed.
-
createMessageBatch
Creates aServiceBusMessageBatch
that can fit as many messages as the transport allows.- Returns:
- A
ServiceBusMessageBatch
that can fit as many messages as the transport allows. - Throws:
ServiceBusException
- if the message batch could not be created.IllegalStateException
- if sender is already disposed.
-
createMessageBatch
Creates anServiceBusMessageBatch
configured with the options specified.- Parameters:
options
- A set of options used to configure theServiceBusMessageBatch
.- Returns:
- A new
ServiceBusMessageBatch
configured with the given options. - Throws:
NullPointerException
- ifoptions
is null.ServiceBusException
- if the message batch could not be created.IllegalStateException
- if sender is already disposed.IllegalArgumentException
- ifCreateMessageBatchOptions.getMaximumSizeInBytes()
is larger than maximum allowed size.
-
scheduleMessage
public Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext) Sends a scheduled message to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.- Parameters:
message
- Message to be sent to the Service Bus Queue.scheduledEnqueueTime
- OffsetDateTime at which the message should appear in the Service Bus queue or topic.transactionContext
- to be set on message before sending to Service Bus.- Returns:
- The sequence number of the scheduled message which can be used to cancel the scheduling of the message.
- Throws:
NullPointerException
- ifmessage
,scheduledEnqueueTime
,transactionContext
ortransactionContext.transactionID
isnull
.ServiceBusException
- If the message could not be scheduled.IllegalStateException
- if sender is already disposed.
-
scheduleMessage
Sends a scheduled message to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.- Parameters:
message
- Message to be sent to the Service Bus Queue.scheduledEnqueueTime
- OffsetDateTime at which the message should appear in the Service Bus queue or topic.- Returns:
- The sequence number of the scheduled message which can be used to cancel the scheduling of the message.
- Throws:
NullPointerException
- ifmessage
orscheduledEnqueueTime
isnull
.ServiceBusException
- If the message could not be scheduled.IllegalStateException
- if sender is already disposed.
-
scheduleMessages
public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime) Sends a batch of scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.- Parameters:
messages
- Messages to be sent to the Service Bus queue or topic.scheduledEnqueueTime
- OffsetDateTime at which the message should appear in the Service Bus queue or topic.- Returns:
- Sequence numbers of the scheduled messages which can be used to cancel the messages.
- Throws:
NullPointerException
- Ifmessages
orscheduledEnqueueTime
isnull
.ServiceBusException
- If the messages could not be scheduled.IllegalStateException
- if sender is already disposed.
-
scheduleMessages
public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime, ServiceBusTransactionContext transactionContext) Sends a scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.- Parameters:
messages
- Messages to be sent to the Service Bus Queue.scheduledEnqueueTime
- OffsetDateTime at which the messages should appear in the Service Bus queue or topic.transactionContext
- Transaction to associate with the operation.- Returns:
- Sequence numbers of the scheduled messages which can be used to cancel the messages.
- Throws:
NullPointerException
- Ifmessages
,scheduledEnqueueTime
,transactionContext
ortransactionContext.transactionId
isnull
.ServiceBusException
- If the messages could not be scheduled or themessage
is larger than the maximum size of theServiceBusMessageBatch
.IllegalStateException
- if sender is already disposed.
-
cancelScheduledMessage
Cancels the enqueuing of a scheduled message, if it was not already enqueued.- Parameters:
sequenceNumber
- of the scheduled message to cancel.- Returns:
- The
Mono
that finishes this operation on service bus resource. - Throws:
IllegalArgumentException
- ifsequenceNumber
is negative.ServiceBusException
- If the messages could not be cancelled.IllegalStateException
- if sender is already disposed.
-
cancelScheduledMessages
Cancels the enqueuing of an already scheduled message, if it was not already enqueued.- Parameters:
sequenceNumbers
- of the scheduled messages to cancel.- Returns:
- The
Mono
that finishes this operation on service bus resource. - Throws:
NullPointerException
- ifsequenceNumbers
is null.IllegalStateException
- if sender is already disposed.ServiceBusException
- if the scheduled messages cannot cancelled.
-
createTransaction
Starts a new transaction on Service Bus. TheServiceBusTransactionContext
should be passed along withServiceBusReceivedMessage
all operations that needs to be in this transaction.- Returns:
- A new
ServiceBusTransactionContext
. - Throws:
IllegalStateException
- if sender is already disposed.ServiceBusException
- if a transaction cannot be created.- See Also:
-
commitTransaction
Commits the transaction givenServiceBusTransactionContext
. This will make a call to Service Bus.- Parameters:
transactionContext
- to be committed.- Returns:
- The
Mono
that finishes this operation on Service Bus resource. - Throws:
IllegalStateException
- if sender is already disposed.NullPointerException
- iftransactionContext
ortransactionContext.transactionId
is null.ServiceBusException
- if the transaction could not be committed.- See Also:
-
rollbackTransaction
Rollbacks the transaction givenServiceBusTransactionContext
. This will make a call to Service Bus.- Parameters:
transactionContext
- Transaction to rollback.- Returns:
- The
Mono
that finishes this operation on the Service Bus resource. - Throws:
IllegalStateException
- if sender is already disposed.NullPointerException
- iftransactionContext
ortransactionContext.transactionId
is null.ServiceBusException
- if the transaction could not be rolled back.- See Also:
-
close
public void close()Disposes of theServiceBusSenderAsyncClient
. If the client has a dedicated connection, the underlying connection is also closed.- Specified by:
close
in interfaceAutoCloseable
-