azure-messaging-eventhubs
Loading...
Searching...
No Matches
event_data_batch.hpp
1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4#pragma once
5#include "models/event_data.hpp"
6
7#include <azure/core/amqp/models/amqp_message.hpp>
8#include <azure/core/diagnostics/logger.hpp>
9#include <azure/core/internal/diagnostics/log.hpp>
10
11#include <mutex>
12#include <stdexcept>
13
14// cspell: words vbin
15
16namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail {
17 class EventDataBatchFactory;
18}}}} // namespace Azure::Messaging::EventHubs::_detail
19
20namespace Azure { namespace Messaging { namespace EventHubs {
21
29 {
30
34 Azure::Nullable<std::uint64_t> MaxBytes;
35
40 std::string PartitionKey;
41
45 std::string PartitionId;
46 };
47
54 class EventDataBatch final {
55 private:
56 const std::string anyPartitionId = "";
57
58 std::mutex m_rwMutex;
59 std::string m_partitionId;
60 std::string m_partitionKey;
61 Azure::Nullable<std::uint64_t> m_maxBytes;
62 std::vector<std::vector<uint8_t>> m_marshalledMessages;
63 // Annotation properties
64 const uint32_t BatchedMessageFormat = 0x80013700;
65
66 Azure::Core::Amqp::Models::AmqpMessage m_batchEnvelope;
67 size_t m_currentSize;
68
69 public:
75 // Copy constructor cannot be defaulted because of m_rwMutex.
76 : m_rwMutex{}, m_partitionId{other.m_partitionId}, m_partitionKey{other.m_partitionKey},
77 m_maxBytes{other.m_maxBytes}, m_marshalledMessages{other.m_marshalledMessages},
78 m_batchEnvelope{other.m_batchEnvelope}, m_currentSize(other.m_currentSize){};
79
82 {
83 // Assignment operator cannot be defaulted because of m_rwMutex.
84 if (this != &other)
85 {
86 m_partitionId = other.m_partitionId;
87 m_partitionKey = other.m_partitionKey;
88 m_maxBytes = other.m_maxBytes;
89 m_marshalledMessages = other.m_marshalledMessages;
90 m_batchEnvelope = other.m_batchEnvelope;
91 m_currentSize = other.m_currentSize;
92 }
93 return *this;
94 }
95
100 std::string GetPartitionId() const { return m_partitionId; }
101
105 std::string GetPartitionKey() const { return m_partitionKey; }
106
111 uint64_t GetMaxBytes() const { return m_maxBytes.Value(); }
112
119 bool TryAddMessage(std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage const> const& message)
120 {
121 return TryAddAmqpMessage(message);
122 }
123
131
135 size_t CurrentSize()
136 {
137 std::lock_guard<std::mutex> lock(m_rwMutex);
138 return m_currentSize;
139 }
140
146 Azure::Core::Amqp::Models::AmqpMessage ToAmqpMessage() const;
147
148 private:
149 bool TryAddAmqpMessage(
150 std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage const> const& message);
151
152 size_t CalculateActualSizeForPayload(std::vector<uint8_t> const& payload)
153 {
154 const size_t vbin8Overhead = 5;
155 const size_t vbin32Overhead = 8;
156
157 if (payload.size() < 256)
158 {
159 return payload.size() + vbin8Overhead;
160 }
161 return payload.size() + vbin32Overhead;
162 }
163
164 Azure::Core::Amqp::Models::AmqpMessage CreateBatchEnvelope(
165 std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage const> const& message)
166 {
167 // Create the batch envelope from the prototype message. This copies all the attributes
168 // *except* the body attribute to the batch envelope.
169 Azure::Core::Amqp::Models::AmqpMessage batchEnvelope{*message};
170 batchEnvelope.BodyType = Azure::Core::Amqp::Models::MessageBodyType::None;
171 batchEnvelope.MessageFormat = BatchedMessageFormat;
172 return batchEnvelope;
173 }
174
179 EventDataBatch(EventDataBatchOptions options = {})
180 : m_partitionId{options.PartitionId}, m_partitionKey{options.PartitionKey},
181 m_maxBytes{options.MaxBytes}, m_marshalledMessages{}, m_batchEnvelope{}, m_currentSize{0}
182 {
183 if (!options.PartitionId.empty() && !options.PartitionKey.empty())
184 {
185 throw std::runtime_error("Either PartionID or PartitionKey can be set, but not both.");
186 }
187
188 if (options.PartitionId.empty())
189 {
190 m_partitionId = anyPartitionId;
191 }
192 };
193
194 friend class _detail::EventDataBatchFactory;
195 };
196}}} // namespace Azure::Messaging::EventHubs
EventDataBatch is used to efficiently pack up EventData before sending it to Event Hubs.
Definition event_data_batch.hpp:54
bool TryAddMessage(std::shared_ptr< Azure::Core::Amqp::Models::AmqpMessage const > const &message)
Attempts to add a raw AMQP message to the data batch.
Definition event_data_batch.hpp:119
EventDataBatch(EventDataBatch const &other)
Constructs an EventDataBatch from another EventDataBatch.
Definition event_data_batch.hpp:74
uint64_t GetMaxBytes() const
Gets the maximum size of the data batch.
Definition event_data_batch.hpp:111
EventDataBatch & operator=(EventDataBatch const &other)
Definition event_data_batch.hpp:81
std::string GetPartitionKey() const
Gets the partition key for the data batch.
Definition event_data_batch.hpp:105
size_t CurrentSize()
Gets the number of messages in the batch.
Definition event_data_batch.hpp:135
Azure::Core::Amqp::Models::AmqpMessage ToAmqpMessage() const
Serializes the EventDataBatch to a single AmqpMessage to be sent to the EventHubs service.
Definition event_data_batch.cpp:22
std::string GetPartitionId() const
Gets the partition ID for the data batch.
Definition event_data_batch.hpp:100
Represents an event sent to the Azure Event Hubs service.
Definition event_data.hpp:19
EventDataBatchOptions contains optional parameters for the [ProducerClient.CreateEventDataBatch] func...
Definition event_data_batch.hpp:29
Azure::Nullable< std::uint64_t > MaxBytes
MaxBytes overrides the max size (in bytes) for a batch. By default CreateEventDataBatch will use the ...
Definition event_data_batch.hpp:34
std::string PartitionKey
PartitionKey is hashed to calculate the partition assignment.Messages and message batches with the sa...
Definition event_data_batch.hpp:40
std::string PartitionId
PartitionId is the ID of the partition to send these messages to. Note that if you use this option th...
Definition event_data_batch.hpp:45