public class EventProcessorClientBuilder extends Object
EventProcessorClient
. Calling EventProcessorClientBuilder.buildEventProcessorClient()
constructs a new instance of EventProcessorClient
.
To create an instance of EventProcessorClient
that processes events with user-provided callback, configure
the following fields:
Consumer group name
.CheckpointStore
- An implementation of CheckpointStore that stores checkpoint and
partition ownership information to enable load balancing.EventProcessorClientBuilder.processEvent(Consumer)
- A callback that processes events received from the Event Hub.connectionString(String)
with a connection string to a specific Event Hub.
connectionString(String, String)
with an Event Hub namespace
connection string and the Event Hub name.credential(String, String, TokenCredential)
with the
fully qualified namespace, Event Hub name, and a set of credentials authorized to use the Event Hub.
Creating an EventProcessorClient
publicEventProcessorClient
createEventProcessor() {String
connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};" + "SharedAccessKey={sharedAccessKey};EntityPath={eventHubName}";EventProcessorClient
eventProcessorClient = newEventProcessorClientBuilder
() .consumerGroup("consumer-group") .checkpointStore(new InMemoryCheckpointStore()) .processEvent(eventContext -> {System
.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + "and sequence number of event = " + eventContext.getEventData().getSequenceNumber()); }) .processError(errorContext -> {System
.out.printf("Error occurred in partition processor for partition {}, {}", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); }) .connectionString(connectionString) .buildEventProcessorClient(); return eventProcessorClient; }
Constructor and Description |
---|
EventProcessorClientBuilder()
Creates a new instance of
EventProcessorClientBuilder . |
Modifier and Type | Method and Description |
---|---|
EventProcessorClient |
buildEventProcessorClient()
This will create a new
EventProcessorClient configured with the options set in this builder. |
EventProcessorClientBuilder |
checkpointStore(CheckpointStore checkpointStore)
Sets the
CheckpointStore the EventProcessorClient will use for storing partition ownership and
checkpoint information. |
EventProcessorClientBuilder |
configuration(Configuration configuration)
Sets the configuration store that is used during construction of the service client.
|
EventProcessorClientBuilder |
connectionString(String connectionString)
Sets the credential information given a connection string to the Event Hub instance.
|
EventProcessorClientBuilder |
connectionString(String connectionString,
String eventHubName)
Sets the credential information given a connection string to the Event Hubs namespace and name to a specific
Event Hub instance.
|
EventProcessorClientBuilder |
consumerGroup(String consumerGroup)
Sets the consumer group name from which the
EventProcessorClient should consume events. |
EventProcessorClientBuilder |
credential(String fullyQualifiedNamespace,
String eventHubName,
TokenCredential credential)
Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
|
EventProcessorClientBuilder |
processError(Consumer<ErrorContext> processError)
The function that is called when an error occurs while processing events.
|
EventProcessorClientBuilder |
processEvent(Consumer<EventContext> processEvent)
The function that is called for each event received by this
EventProcessorClient . |
EventProcessorClientBuilder |
processPartitionClose(Consumer<CloseContext> closePartition)
The function that is called when a processing for a partition stops.
|
EventProcessorClientBuilder |
processPartitionInitialization(Consumer<InitializationContext> initializePartition)
The function that is called before processing starts for a partition.
|
EventProcessorClientBuilder |
proxyOptions(ProxyOptions proxyOptions)
Sets the proxy configuration to use for
EventHubAsyncClient . |
EventProcessorClientBuilder |
retry(AmqpRetryOptions retryOptions)
Sets the retry policy for
EventHubAsyncClient . |
EventProcessorClientBuilder |
trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)
Sets whether or not the event processor should request information on the last enqueued event on its associated
partition, and track that information as events are received.
|
EventProcessorClientBuilder |
transportType(AmqpTransportType transport)
Sets the transport type by which all the communication with Azure Event Hubs occurs.
|
public EventProcessorClientBuilder()
EventProcessorClientBuilder
.public EventProcessorClientBuilder connectionString(String connectionString)
If the connection string is copied from the Event Hubs namespace, it will likely not contain the name to the desired Event Hub, which is needed. In this case, the name can be added manually by adding "EntityPath=EVENT_HUB_NAME" to the end of the connection string. For example, "EntityPath=telemetry-hub".
If you have defined a shared access policy directly on the Event Hub itself, then copying the connection string from that Event Hub will result in a connection string that contains the name.
connectionString
- The connection string to use for connecting to the Event Hub instance. It is expected
that the Event Hub name and the shared access key properties are contained in this connection string.EventProcessorClientBuilder
object.NullPointerException
- if connectionString
is null
.IllegalArgumentException
- if connectionString
is empty. Or, the connectionString
does not
contain the "EntityPath" key, which is the name of the Event Hub instance.AzureException
- If the shared access signature token credential could not be created using the connection
string.public EventProcessorClientBuilder connectionString(String connectionString, String eventHubName)
connectionString
- The connection string to use for connecting to the Event Hubs namespace; it is expected
that the shared access key properties are contained in this connection string, but not the Event Hub name.eventHubName
- The name of the Event Hub to connect the client to.EventProcessorClientBuilder
object.NullPointerException
- if connectionString
or eventHubName
is null.IllegalArgumentException
- if connectionString
or eventHubName
is an empty string. Or, if
the connectionString
contains the Event Hub name.AzureException
- If the shared access signature token credential could not be created using the connection
string.public EventProcessorClientBuilder configuration(Configuration configuration)
EventHubAsyncClient
. Use
Configuration.NONE
to bypass using configuration settings during construction.configuration
- The configuration store used to configure the EventHubAsyncClient
.EventProcessorClientBuilder
object.public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential)
fullyQualifiedNamespace
- The fully qualified name for the Event Hubs namespace. This is likely to be
similar to "{your-namespace}.servicebus.windows.net".eventHubName
- The name of the Event Hub to connect the client to.credential
- The token credential to use for authorization. Access controls may be specified by the Event
Hubs namespace or the requested Event Hub, depending on Azure configuration.EventProcessorClientBuilder
object.IllegalArgumentException
- if fullyQualifiedNamespace
or eventHubName
is an empty string.NullPointerException
- if fullyQualifiedNamespace
, eventHubName
, credentials
is
null.public EventProcessorClientBuilder proxyOptions(ProxyOptions proxyOptions)
EventHubAsyncClient
. When a proxy is configured, AmqpTransportType.AMQP_WEB_SOCKETS
must be used for the transport type.proxyOptions
- The proxy options to use.EventProcessorClientBuilder
object.public EventProcessorClientBuilder transportType(AmqpTransportType transport)
AmqpTransportType.AMQP
.transport
- The transport type to use.EventProcessorClientBuilder
object.public EventProcessorClientBuilder retry(AmqpRetryOptions retryOptions)
EventHubAsyncClient
. If not specified, the default retry options are used.retryOptions
- The retry policy to use.EventProcessorClientBuilder
object.public EventProcessorClientBuilder consumerGroup(String consumerGroup)
EventProcessorClient
should consume events.consumerGroup
- The consumer group name this EventProcessorClient
should consume events.EventProcessorClientBuilder
instance.NullPointerException
- if consumerGroup
is null
.public EventProcessorClientBuilder checkpointStore(CheckpointStore checkpointStore)
CheckpointStore
the EventProcessorClient
will use for storing partition ownership and
checkpoint information.
Users can, optionally, provide their own implementation of CheckpointStore
which will store ownership and
checkpoint information.
checkpointStore
- Implementation of CheckpointStore
.EventProcessorClientBuilder
instance.NullPointerException
- if checkpointStore
is null
.public EventProcessorClientBuilder processEvent(Consumer<EventContext> processEvent)
EventProcessorClient
. The input contains the
partition context and the event data.processEvent
- The callback that's called when an event is received by this EventProcessorClient
.EventProcessorClientBuilder
instance.NullPointerException
- if processEvent
is null
.public EventProcessorClientBuilder processError(Consumer<ErrorContext> processError)
processError
- The callback that's called when an error occurs while processing events.EventProcessorClientBuilder
instance.public EventProcessorClientBuilder processPartitionInitialization(Consumer<InitializationContext> initializePartition)
CheckpointStore
. Users can update this position if a different starting
position is preferred.initializePartition
- The callback that's called before processing starts for a partitionEventProcessorClientBuilder
instance.public EventProcessorClientBuilder processPartitionClose(Consumer<CloseContext> closePartition)
closePartition
- The callback that's called after processing for a partition stops.EventProcessorClientBuilder
instance.public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)
When information about the partition's last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition that it otherwise would not. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client.
trackLastEnqueuedEventProperties
- true
if the resulting events will keep track of the last
enqueued information for that partition; false
otherwise.EventProcessorClientBuilder
instance.public EventProcessorClient buildEventProcessorClient()
EventProcessorClient
configured with the options set in this builder. Each call to
this method will return a new instance of EventProcessorClient
.
All partitions processed by this EventProcessorClient
will start processing from earliest
available event in the respective partitions.
EventProcessorClient
.NullPointerException
- if processEvent
or processError
or checkpointStore
or
consumerGroup
is null
.IllegalArgumentException
- if the credentials have not been set using either EventProcessorClientBuilder.connectionString(String)
or EventProcessorClientBuilder.credential(String, String, TokenCredential)
. Or, if a proxy is specified
but the transport type is not web sockets
.Copyright © 2019 Microsoft Corporation. All rights reserved.