Package com.azure.messaging.servicebus
Class ServiceBusProcessorClient
java.lang.Object
com.azure.messaging.servicebus.ServiceBusProcessorClient
- All Implemented Interfaces:
AutoCloseable
The processor client for processing Service Bus messages.
ServiceBusProcessorClient
provides a push-based
mechanism that invokes the message processing callback when a message is received or the error handler when an error
occurs when receiving messages. A ServiceBusProcessorClient
can be created to process messages for a
session-enabled or non session-enabled Service Bus entity. It supports auto-settlement of messages by default.
Create and run a processor
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody()); }; Consumer<ServiceBusErrorContext> onError = context -> { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (context.getException() instanceof ServiceBusException) { ServiceBusException exception = (ServiceBusException) context.getException(); System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(), exception.getReason()); } else { System.out.printf("Error occurred: %s%n", context.getException()); } }; // Retrieve 'connectionString/queueName' from your configuration. ServiceBusProcessorClient processor = new ServiceBusClientBuilder() .connectionString(connectionString) .processor() .queueName(queueName) .processMessage(onMessage) .processError(onError) .buildProcessorClient(); // Start the processor in the background processor.start();
Create and run a session-enabled processor
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getSessionId(), message.getSequenceNumber(), message.getBody()); }; Consumer<ServiceBusErrorContext> onError = context -> { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (context.getException() instanceof ServiceBusException) { ServiceBusException exception = (ServiceBusException) context.getException(); System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(), exception.getReason()); } else { System.out.printf("Error occurred: %s%n", context.getException()); } }; // Retrieve 'connectionString/queueName' from your configuration. ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionProcessor() .queueName(queueName) .maxConcurrentSessions(2) .processMessage(onMessage) .processError(onError) .buildProcessorClient(); // Start the processor in the background sessionProcessor.start();
-
Method Summary
Modifier and TypeMethodDescriptionvoid
close()
Stops message processing and closes the processor.Gets the identifier of the instance ofServiceBusProcessorClient
.Returns the queue name associated with this instance ofServiceBusProcessorClient
.Returns the subscription name associated with this instance ofServiceBusProcessorClient
.Returns the topic name associated with this instance ofServiceBusProcessorClient
.boolean
Returnstrue
if the processor is running.void
start()
Starts the processor in the background.void
stop()
Stops the message processing for this processor.
-
Method Details
-
start
public void start()Starts the processor in the background. When this method is called, the processor will initiate a message receiver that will invoke the message handler when new messages are available. This method is idempotent (ie. callingstart()
again after the processor is already running is a no-op).Calling
start()
after callingstop()
will resume processing messages using the same underlying connection.Calling
start()
after callingclose()
will start the processor with a new connection. -
stop
public void stop()Stops the message processing for this processor. The receiving links and sessions are kept active and this processor can resume processing messages by callingstart()
again. -
close
public void close()Stops message processing and closes the processor. The receiving links and sessions are closed and callingstart()
will create a new processing cycle with new links and new sessions.- Specified by:
close
in interfaceAutoCloseable
-
isRunning
public boolean isRunning()Returnstrue
if the processor is running. If the processor is stopped or closed, this method returnsfalse
.- Returns:
true
if the processor is running;false
otherwise.
-
getQueueName
Returns the queue name associated with this instance ofServiceBusProcessorClient
.- Returns:
- the queue name associated with this instance of
ServiceBusProcessorClient
ornull
if the processor instance is for a topic and subscription.
-
getTopicName
Returns the topic name associated with this instance ofServiceBusProcessorClient
.- Returns:
- the topic name associated with this instance of
ServiceBusProcessorClient
ornull
if the processor instance is for a queue.
-
getSubscriptionName
Returns the subscription name associated with this instance ofServiceBusProcessorClient
.- Returns:
- the subscription name associated with this instance of
ServiceBusProcessorClient
ornull
if the processor instance is for a queue.
-
getIdentifier
Gets the identifier of the instance ofServiceBusProcessorClient
.- Returns:
- The identifier that can identify the instance of
ServiceBusProcessorClient
.
-