public class EventHubConsumerClient extends Object implements Closeable
EventData
from an Event Hub partition in the context of
a specific consumer group.
Creating a synchronous consumer
// The required parameters are `consumerGroup`, and a way to authenticate with Event Hubs using credentials.EventHubConsumerClient
consumer = newEventHubClientBuilder
() .connectionString( "Endpoint={eh-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key};Entity-Path={hub-name}") .consumerGroup("$DEFAULT") .buildConsumerClient();
Consuming events from a single partition
Events from a single partition can be consumed using EventHubConsumerClient.receiveFromPartition(String, int, EventPosition)
or
EventHubConsumerClient.receiveFromPartition(String, int, EventPosition, Duration)
. The call to `receive` completes and returns an
IterableStream
when either the maximum number of events is received, or the timeout has elapsed.
Instant
twelveHoursAgo =Instant
.now().minus(Duration
.ofHours(12));EventPosition
startingPosition =EventPosition
.fromEnqueuedTime(twelveHoursAgo);String
partitionId = "0"; // Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed.IterableStream
<PartitionEvent
> events = consumer.receiveFromPartition(partitionId, 100, startingPosition,Duration
.ofSeconds(30));Long
lastSequenceNumber = -1L; for (PartitionEvent
partitionEvent : events) { // For each event, perform some sort of processing.System
.out.print("Event received: " + partitionEvent.getData().getSequenceNumber()); lastSequenceNumber = partitionEvent.getData().getSequenceNumber(); } // Figure out what the next EventPosition to receive from is based on last event we processed in the stream. // If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the // partition. if (lastSequenceNumber != -1L) {EventPosition
nextPosition =EventPosition
.fromSequenceNumber(lastSequenceNumber, false); // Gets the next set of events from partition '0' to consume and process.IterableStream
<PartitionEvent
> nextEvents = consumer.receiveFromPartition(partitionId, 100, nextPosition,Duration
.ofSeconds(30)); }
Modifier and Type | Method and Description |
---|---|
void |
close() |
String |
getConsumerGroup()
Gets the consumer group this consumer is reading events as a part of.
|
String |
getEventHubName()
Gets the Event Hub name this client interacts with.
|
EventHubProperties |
getEventHubProperties()
Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
|
String |
getFullyQualifiedNamespace()
Gets the fully qualified Event Hubs namespace that the connection is associated with.
|
IterableStream<String> |
getPartitionIds()
Retrieves the identifiers for the partitions of an Event Hub.
|
PartitionProperties |
getPartitionProperties(String partitionId)
Retrieves information about a specific partition for an Event Hub, including elements that describe the available
events in the partition event stream.
|
IterableStream<PartitionEvent> |
receiveFromPartition(String partitionId,
int maximumMessageCount,
EventPosition startingPosition)
Receives a batch of
events from the Event Hub partition. |
IterableStream<PartitionEvent> |
receiveFromPartition(String partitionId,
int maximumMessageCount,
EventPosition startingPosition,
Duration maximumWaitTime)
Receives a batch of
events from the Event Hub partition. |
IterableStream<PartitionEvent> |
receiveFromPartition(String partitionId,
int maximumMessageCount,
EventPosition startingPosition,
Duration maximumWaitTime,
ReceiveOptions receiveOptions)
Receives a batch of
events from the Event Hub partition. |
public String getFullyQualifiedNamespace()
{yournamespace}.servicebus.windows.net
.public String getEventHubName()
public String getConsumerGroup()
public EventHubProperties getEventHubProperties()
public IterableStream<String> getPartitionIds()
public PartitionProperties getPartitionProperties(String partitionId)
partitionId
- The unique identifier of a partition associated with the Event Hub.NullPointerException
- if partitionId
is null.public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition)
events
from the Event Hub partition.maximumMessageCount
- The maximum number of messages to receive in this batch.partitionId
- Identifier of the partition to read events from.startingPosition
- Position within the Event Hub partition to begin consuming events.PartitionEvent
that was received. The iterable contains up to
maximumMessageCount
events. If a stream for the events was opened before, the same position within
that partition is returned. Otherwise, events are read starting from startingPosition
.NullPointerException
- if partitionId
, or startingPosition
is null.IllegalArgumentException
- if maximumMessageCount
is less than 1, or if partitionId
is an
empty string.public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime)
events
from the Event Hub partition.partitionId
- Identifier of the partition to read events from.maximumMessageCount
- The maximum number of messages to receive in this batch.startingPosition
- Position within the Event Hub partition to begin consuming events.maximumWaitTime
- The maximum amount of time to wait to build up the requested message count for the
batch; if not specified, the default wait time specified when the consumer was created will be used.PartitionEvent
that was received. The iterable contains up to
maximumMessageCount
events.NullPointerException
- if partitionId
, maximumWaitTime
, or startingPosition
is
null
.IllegalArgumentException
- if maximumMessageCount
is less than 1 or maximumWaitTime
is
zero or a negative duration.public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime, ReceiveOptions receiveOptions)
events
from the Event Hub partition.partitionId
- Identifier of the partition to read events from.maximumMessageCount
- The maximum number of messages to receive in this batch.startingPosition
- Position within the Event Hub partition to begin consuming events.maximumWaitTime
- The maximum amount of time to wait to build up the requested message count for the
batch; if not specified, the default wait time specified when the consumer was created will be used.receiveOptions
- Options when receiving events from the partition.PartitionEvent
that was received. The iterable contains up to
maximumMessageCount
events.NullPointerException
- if maximumWaitTime
, startingPosition
, partitionId
, or
receiveOptions
is null
.IllegalArgumentException
- if maximumMessageCount
is less than 1 or maximumWaitTime
is
zero or a negative duration.public void close()
close
in interface Closeable
close
in interface AutoCloseable
Copyright © 2019 Microsoft Corporation. All rights reserved.