public class EventHubConsumerAsyncClient extends Object implements Closeable
EventData
from either a specific Event Hub partition
or all partitions in the context of a specific consumer group.
Creating an EventHubConsumerAsyncClient
// The required parameters are `consumerGroup` and a way to authenticate with Event Hubs using credentials.EventHubConsumerAsyncClient
consumer = newEventHubClientBuilder
() .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + "SharedAccessKey={key};EntityPath={eh-name}") .consumerGroup("consumer-group-name") .buildAsyncConsumerClient();
Consuming events a single partition from Event Hub
// Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()String
partitionId = "0";EventPosition
startingPosition =EventPosition
.latest(); // Keep a reference to `subscription`. When the program is finished receiving events, call // subscription.dispose(). This will stop fetching events from the Event Hub.Disposable
subscription = consumer.receiveFromPartition(partitionId, startingPosition) .subscribe(partitionEvent -> {PartitionContext
partitionContext = partitionEvent.getPartitionContext();EventData
event = partitionEvent.getData();System
.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());System
.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString()); }, error ->System
.err.print(error.toString()));
Viewing latest partition information
Latest partition information as events are received can by setting
setTrackLastEnqueuedEventProperties
to
true
. As events come in, explore the PartitionEvent
object.
// Set `setTrackLastEnqueuedEventProperties` to true to get the last enqueued information from the partition for // each event that is received.ReceiveOptions
receiveOptions = newReceiveOptions
() .setTrackLastEnqueuedEventProperties(true); // Receives events from partition "0" as they come in. consumer.receiveFromPartition("0",EventPosition
.earliest(), receiveOptions) .subscribe(partitionEvent -> {LastEnqueuedEventProperties
properties = partitionEvent.getLastEnqueuedEventProperties();System
.out.printf("Information received at %s. Last enqueued sequence number: %s%n", properties.getRetrievalTime(), properties.getSequenceNumber()); });
Rate limiting consumption of events from Event Hub
For event consumers that need to limit the number of events they receive at a given time, they can use
BaseSubscriber.request(long)
.
consumer.receiveFromPartition(partitionId,EventPosition
.latest()).subscribe(newBaseSubscriber
<PartitionEvent
>() { private static final int NUMBER_OF_EVENTS = 5; private finalAtomicInteger
currentNumberOfEvents = newAtomicInteger
(); @Override
protected void hookOnSubscribe(Subscription
subscription) { // Tell the Publisher we only want 5 events at a time. request(NUMBER_OF_EVENTS); } @Override
protected void hookOnNext(PartitionEvent
value) { // Process the EventData // If the number of events we have currently received is a multiple of 5, that means we have reached the // last event the Publisher will provide to us. Invoking request(long) here, tells the Publisher that // the subscriber is ready to get more events from upstream. if (currentNumberOfEvents.incrementAndGet() % 5 == 0) { request(NUMBER_OF_EVENTS); } } });
Receiving from all partitions
// Receives events from all partitions from the beginning of each partition. consumer.receive(true).subscribe(partitionEvent -> {PartitionContext
context = partitionEvent.getPartitionContext();EventData
event = partitionEvent.getData();System
.out.printf("Event %s is from partition %s%n.", event.getSequenceNumber(), context.getPartitionId()); });
Modifier and Type | Method and Description |
---|---|
void |
close()
Disposes of the consumer by closing the underlying connection to the service.
|
String |
getConsumerGroup()
Gets the consumer group this consumer is reading events as a part of.
|
String |
getEventHubName()
Gets the Event Hub name this client interacts with.
|
Mono<EventHubProperties> |
getEventHubProperties()
Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
|
String |
getFullyQualifiedNamespace()
Gets the fully qualified Event Hubs namespace that the connection is associated with.
|
Flux<String> |
getPartitionIds()
Retrieves the identifiers for the partitions of an Event Hub.
|
Mono<PartitionProperties> |
getPartitionProperties(String partitionId)
Retrieves information about a specific partition for an Event Hub, including elements that describe the available
events in the partition event stream.
|
Flux<PartitionEvent> |
receive()
Consumes events from all partitions starting from the beginning of each partition.
|
Flux<PartitionEvent> |
receive(boolean startReadingAtEarliestEvent)
Consumes events from all partitions.
|
Flux<PartitionEvent> |
receive(boolean startReadingAtEarliestEvent,
ReceiveOptions receiveOptions)
Consumes events from all partitions configured with a set of
receiveOptions . |
Flux<PartitionEvent> |
receiveFromPartition(String partitionId,
EventPosition startingPosition)
Consumes events from a single partition starting at
startingPosition . |
Flux<PartitionEvent> |
receiveFromPartition(String partitionId,
EventPosition startingPosition,
ReceiveOptions receiveOptions)
Consumes events from a single partition starting at
startingPosition with a set of receive options . |
public String getFullyQualifiedNamespace()
{yournamespace}.servicebus.windows.net
.public String getEventHubName()
public String getConsumerGroup()
public Mono<EventHubProperties> getEventHubProperties()
public Flux<String> getPartitionIds()
public Mono<PartitionProperties> getPartitionProperties(String partitionId)
partitionId
- The unique identifier of a partition associated with the Event Hub.NullPointerException
- if partitionId
is null.public Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition)
startingPosition
.partitionId
- Identifier of the partition to read events from.startingPosition
- Position within the Event Hub partition to begin consuming events.startingPosition
.NullPointerException
- if partitionId
, or startingPosition
is null.IllegalArgumentException
- if partitionId
is an empty string.public Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)
startingPosition
with a set of receive options
.
ReceiveOptions.getOwnerLevel()
has a value, then Event Hubs service will
guarantee only one active consumer exists per partitionId and consumer group combination. This receive operation
is sometimes referred to as an "Epoch Consumer".ReceiveOptions.getOwnerLevel()
when invoking receive operations. This non-exclusive consumer is sometimes
referred to as a "Non-Epoch Consumer."partitionId
- Identifier of the partition to read events from.startingPosition
- Position within the Event Hub partition to begin consuming events.receiveOptions
- Options when receiving events from the partition.startingPosition
.NullPointerException
- if partitionId
, startingPosition
, receiveOptions
is
null.IllegalArgumentException
- if partitionId
is an empty string.public Flux<PartitionEvent> receive()
This method is not recommended for production use; the EventProcessorClient
should be used for
reading events from all partitions in a production scenario, as it offers a much more robust experience with
higher throughput.
It is important to note that this method does not guarantee fairness amongst the partitions. Depending on service
communication, there may be a clustering of events per partition and/or there may be a noticeable bias for a
given partition or subset of partitions.
public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent)
This method is not recommended for production use; the EventProcessorClient
should be used for
reading events from all partitions in a production scenario, as it offers a much more robust experience with
higher throughput.
It is important to note that this method does not guarantee fairness amongst the partitions. Depending on service
communication, there may be a clustering of events per partition and/or there may be a noticeable bias for a
given partition or subset of partitions.
startReadingAtEarliestEvent
- true
to begin reading at the first events available in each
partition; otherwise, reading will begin at the end of each partition seeing only new events as they are
published.public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)
receiveOptions
.
This method is not recommended for production use; the EventProcessorClient
should be used for
reading events from all partitions in a production scenario, as it offers a much more robust experience with
higher throughput.
It is important to note that this method does not guarantee fairness amongst the partitions. Depending on service
communication, there may be a clustering of events per partition and/or there may be a noticeable bias for a
given partition or subset of partitions.
ReceiveOptions.getOwnerLevel()
has a value, then Event Hubs service will
guarantee only one active consumer exists per partitionId and consumer group combination. This receive operation
is sometimes referred to as an "Epoch Consumer".ReceiveOptions.getOwnerLevel()
when invoking receive operations. This non-exclusive consumer is sometimes
referred to as a "Non-Epoch Consumer."startReadingAtEarliestEvent
- true
to begin reading at the first events available in each
partition; otherwise, reading will begin at the end of each partition seeing only new events as they are
published.receiveOptions
- Options when receiving events from each Event Hub partition.NullPointerException
- if receiveOptions
is null.public void close()
close
in interface Closeable
close
in interface AutoCloseable
Copyright © 2019 Microsoft Corporation. All rights reserved.