public class EventHubProducerAsyncClient extends Object implements Closeable
EventData
to a specific Event Hub, grouped
together in batches. Depending on the options
specified when creating an
EventDataBatch
, the events may be automatically routed to an available partition or specific to a partition.
Allowing automatic routing of partitions is recommended when:
If no partition id is specified, the following rules are used for automatically selecting one:
Create a producer and publish events to any partition
// The required parameter is a way to authenticate with Event Hubs using credentials. // The connectionString provides a way to authenticate with Event Hub.EventHubProducerClient
producer = newEventHubClientBuilder
() .connectionString( "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}", "event-hub-name") .buildProducerClient();List
<EventData
> events =Arrays
.asList(newEventData
("test-event-1"), newEventData
("test-event-2")); // Creating a batch without options set, will allow for automatic routing of events to any partition.EventDataBatch
batch = producer.createBatch(); for (EventData
event : events) { if (batch.tryAdd(event)) { continue; } producer.send(batch); batch = producer.createBatch(); if (!batch.tryAdd(event)) { throw newIllegalArgumentException
("Event is too large for an empty batch."); } }
Publish events to partition "foo"
// Creating a batch with partitionId set will route all events in that batch to partition `foo`.CreateBatchOptions
options = newCreateBatchOptions
().setPartitionId("foo"); producer.createBatch(options).flatMap(batch -> { batch.tryAdd(newEventData
("test-event-1")); batch.tryAdd(newEventData
("test-event-2")); return producer.send(batch); }).subscribe(unused -> { }, error ->System
.err.println("Error occurred while sending batch:" + error), () ->System
.out.println("Send complete."));
Publish events to the same partition, grouped together using partition key
// Creating a batch with partitionKey set will tell the service to hash the partitionKey and decide which // partition to send the events to. Events with the same partitionKey are always routed to the same partition.CreateBatchOptions
options = newCreateBatchOptions
().setPartitionKey("bread"); producer.createBatch(options).flatMap(batch -> { batch.tryAdd(newEventData
("sourdough")); batch.tryAdd(newEventData
("rye")); return producer.send(batch); }).subscribe(unused -> { }, error ->System
.err.println("Error occurred while sending batch:" + error), () ->System
.out.println("Send complete."));
Publish events using a size-limited EventDataBatch
finalFlux
<EventData
> telemetryEvents =Flux
.just(firstEvent, secondEvent); // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch. // In this case, all the batches created with these options are limited to 256 bytes. finalCreateBatchOptions
options = newCreateBatchOptions
() .setMaximumSizeInBytes(256); finalAtomicReference
<EventDataBatch
> currentBatch = newAtomicReference
<>( producer.createBatch(options).block()); // The sample Flux contains two events, but it could be an infinite stream of telemetry events. telemetryEvents.flatMap(event -> { finalEventDataBatch
batch = currentBatch.get(); if (batch.tryAdd(event)) { returnMono
.empty(); } returnMono
.when( producer.send(batch), producer.createBatch(options).map(newBatch -> { currentBatch.set(newBatch); // Add the event that did not fit in the previous batch. if (!newBatch.tryAdd(event)) { throwExceptions
.propagate(newIllegalArgumentException
( "Event was too large to fit in an empty batch. Max size: " + newBatch.getMaxSizeInBytes())); } return newBatch; })); }).then() .doFinally(signal -> { finalEventDataBatch
batch = currentBatch.getAndSet(null); if (batch != null && batch.getCount() > 0) { producer.send(batch).block(); } });
Modifier and Type | Method and Description |
---|---|
void |
close()
Disposes of the
EventHubProducerAsyncClient . |
Mono<EventDataBatch> |
createBatch()
Creates an
EventDataBatch that can fit as many events as the transport allows. |
Mono<EventDataBatch> |
createBatch(CreateBatchOptions options)
Creates an
EventDataBatch configured with the options specified. |
String |
getEventHubName()
Gets the Event Hub name this client interacts with.
|
Mono<EventHubProperties> |
getEventHubProperties()
Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
|
String |
getFullyQualifiedNamespace()
Gets the fully qualified Event Hubs namespace that the connection is associated with.
|
Flux<String> |
getPartitionIds()
Retrieves the identifiers for the partitions of an Event Hub.
|
Mono<PartitionProperties> |
getPartitionProperties(String partitionId)
Retrieves information about a specific partition for an Event Hub, including elements that describe the available
events in the partition event stream.
|
Mono<Void> |
send(EventDataBatch batch)
Sends the batch to the associated Event Hub.
|
public String getFullyQualifiedNamespace()
{yournamespace}.servicebus.windows.net
.public String getEventHubName()
public Mono<EventHubProperties> getEventHubProperties()
public Flux<String> getPartitionIds()
public Mono<PartitionProperties> getPartitionProperties(String partitionId)
partitionId
- The unique identifier of a partition associated with the Event Hub.NullPointerException
- if partitionId
is null.public Mono<EventDataBatch> createBatch()
EventDataBatch
that can fit as many events as the transport allows.EventDataBatch
that can fit as many events as the transport allows.public Mono<EventDataBatch> createBatch(CreateBatchOptions options)
EventDataBatch
configured with the options specified.options
- A set of options used to configure the EventDataBatch
.EventDataBatch
that can fit as many events as the transport allows.NullPointerException
- if options
is null.public Mono<Void> send(EventDataBatch batch)
batch
- The batch to send to the service.Mono
that completes when the batch is pushed to the service.NullPointerException
- if batch
is null
.EventHubProducerAsyncClient.createBatch()
,
EventHubProducerAsyncClient.createBatch(CreateBatchOptions)
public void close()
EventHubProducerAsyncClient
. If the client had a dedicated connection, the underlying
connection is also closed.close
in interface Closeable
close
in interface AutoCloseable
Copyright © 2019 Microsoft Corporation. All rights reserved.