public final class ServiceBusProcessorClient extends Object implements AutoCloseable
ServiceBusProcessorClient
provides a push-based
mechanism that invokes the message processing callback when a message is received or the error handler when an error
occurs when receiving messages. A ServiceBusProcessorClient
can be created to process messages for a
session-enabled or non session-enabled Service Bus entity. It supports auto-settlement of messages by default.
Create and run a processor
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody()); }; Consumer<ServiceBusErrorContext> onError = context -> { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (context.getException() instanceof ServiceBusException) { ServiceBusException exception = (ServiceBusException) context.getException(); System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(), exception.getReason()); } else { System.out.printf("Error occurred: %s%n", context.getException()); } }; // Retrieve 'connectionString/queueName' from your configuration. ServiceBusProcessorClient processor = new ServiceBusClientBuilder() .connectionString(connectionString) .processor() .queueName(queueName) .processMessage(onMessage) .processError(onError) .buildProcessorClient(); // Start the processor in the background processor.start();
Create and run a session-enabled processor
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getSessionId(), message.getSequenceNumber(), message.getBody()); }; Consumer<ServiceBusErrorContext> onError = context -> { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (context.getException() instanceof ServiceBusException) { ServiceBusException exception = (ServiceBusException) context.getException(); System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(), exception.getReason()); } else { System.out.printf("Error occurred: %s%n", context.getException()); } }; // Retrieve 'connectionString/queueName' from your configuration. ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionProcessor() .queueName(queueName) .maxConcurrentSessions(2) .processMessage(onMessage) .processError(onError) .buildProcessorClient(); // Start the processor in the background sessionProcessor.start();
Modifier and Type | Method and Description |
---|---|
void |
close()
Stops message processing and closes the processor.
|
boolean |
isRunning()
Returns
true if the processor is running. |
void |
start()
Starts the processor in the background.
|
void |
stop()
Stops the message processing for this processor.
|
public void start()
start()
again after the processor is already running is a no-op).
Calling start()
after calling stop()
will resume processing messages using the same
underlying connection.
Calling start()
after calling close()
will start the processor with a new connection.
public void stop()
ServiceBusProcessorClient.start()
again.public void close()
ServiceBusProcessorClient.start()
will create a new processing cycle with new links and new sessions.close
in interface AutoCloseable
public boolean isRunning()
true
if the processor is running. If the processor is stopped or closed, this method returns
false
.true
if the processor is running; false
otherwise.Copyright © 2021 Microsoft Corporation. All rights reserved.