azure-messaging-eventhubs
Loading...
Searching...
No Matches
event_data_batch.hpp
1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4#pragma once
5#include "models/event_data.hpp"
6
7#include <azure/core/amqp/models/amqp_message.hpp>
8#include <azure/core/diagnostics/logger.hpp>
9#include <azure/core/internal/diagnostics/log.hpp>
10
11#include <mutex>
12#include <stdexcept>
13
14// cspell: words vbin
15
16namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail {
17 class EventDataBatchFactory;
18}}}} // namespace Azure::Messaging::EventHubs::_detail
19
20namespace Azure { namespace Messaging { namespace EventHubs {
21
29 {
30
34 Azure::Nullable<std::uint64_t> MaxBytes;
35
40 std::string PartitionKey;
41
45 std::string PartitionId;
46 };
47
54 class EventDataBatch final {
55 private:
56 const std::string anyPartitionId = "";
57
58 std::mutex m_rwMutex;
59 std::string m_partitionId;
60 std::string m_partitionKey;
61 Azure::Nullable<std::uint64_t> m_maxBytes;
62 std::vector<std::vector<uint8_t>> m_marshalledMessages;
63 // Annotation properties
64 const uint32_t BatchedMessageFormat = 0x80013700;
65
66 Azure::Core::Amqp::Models::AmqpMessage m_batchEnvelope;
67 size_t m_currentSize;
68
69 public:
75 // Copy constructor cannot be defaulted because of m_rwMutex.
76 : m_rwMutex{}, m_partitionId{other.m_partitionId}, m_partitionKey{other.m_partitionKey},
77 m_maxBytes{other.m_maxBytes}, m_marshalledMessages{other.m_marshalledMessages},
78 m_batchEnvelope{other.m_batchEnvelope}, m_currentSize(other.m_currentSize){};
79
82 {
83 // Assignment operator cannot be defaulted because of m_rwMutex.
84 if (this != &other)
85 {
86 m_partitionId = other.m_partitionId;
87 m_partitionKey = other.m_partitionKey;
88 m_maxBytes = other.m_maxBytes;
89 m_marshalledMessages = other.m_marshalledMessages;
90 m_batchEnvelope = other.m_batchEnvelope;
91 m_currentSize = other.m_currentSize;
92 }
93 return *this;
94 }
95
100 std::string GetPartitionId() const { return m_partitionId; }
101
105 std::string GetPartitionKey() const { return m_partitionKey; }
106
111 uint64_t GetMaxBytes() const { return m_maxBytes.Value(); }
112
119 _azure_NODISCARD bool TryAdd(
120 std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage const> const& message)
121 {
122 return TryAddAmqpMessage(message);
123 }
124
131 _azure_NODISCARD bool TryAdd(Azure::Messaging::EventHubs::Models::EventData const& message);
132
137 {
138 std::lock_guard<std::mutex> lock(m_rwMutex);
139 return m_marshalledMessages.size();
140 }
141
147 Azure::Core::Amqp::Models::AmqpMessage ToAmqpMessage() const;
148
149 private:
150 bool TryAddAmqpMessage(
151 std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage const> const& message);
152
153 size_t CalculateActualSizeForPayload(std::vector<uint8_t> const& payload)
154 {
155 const size_t vbin8Overhead = 5;
156 const size_t vbin32Overhead = 8;
157
158 if (payload.size() < 256)
159 {
160 return payload.size() + vbin8Overhead;
161 }
162 return payload.size() + vbin32Overhead;
163 }
164
165 Azure::Core::Amqp::Models::AmqpMessage CreateBatchEnvelope(
166 std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage const> const& message) const
167 {
168 // Create the batch envelope from the prototype message. This copies all the attributes
169 // *except* the body attribute to the batch envelope.
170 Azure::Core::Amqp::Models::AmqpMessage batchEnvelope{*message};
171 batchEnvelope.BodyType = Azure::Core::Amqp::Models::MessageBodyType::None;
172 batchEnvelope.MessageFormat = BatchedMessageFormat;
173 return batchEnvelope;
174 }
175
180 EventDataBatch(EventDataBatchOptions options = {})
181 : m_partitionId{options.PartitionId}, m_partitionKey{options.PartitionKey},
182 m_maxBytes{options.MaxBytes}, m_marshalledMessages{}, m_batchEnvelope{}, m_currentSize{0}
183 {
184 if (!options.PartitionId.empty() && !options.PartitionKey.empty())
185 {
186 throw std::runtime_error("Either PartionID or PartitionKey can be set, but not both.");
187 }
188
189 if (options.PartitionId.empty())
190 {
191 m_partitionId = anyPartitionId;
192 }
193 };
194
195 friend class _detail::EventDataBatchFactory;
196 };
197}}} // namespace Azure::Messaging::EventHubs
EventDataBatch is used to efficiently pack up EventData before sending it to Event Hubs.
Definition event_data_batch.hpp:54
EventDataBatch(EventDataBatch const &other)
Constructs an EventDataBatch from another EventDataBatch.
Definition event_data_batch.hpp:74
uint64_t GetMaxBytes() const
Gets the maximum size of the data batch.
Definition event_data_batch.hpp:111
EventDataBatch & operator=(EventDataBatch const &other)
Definition event_data_batch.hpp:81
std::string GetPartitionKey() const
Gets the partition key for the data batch.
Definition event_data_batch.hpp:105
size_t NumberOfEvents()
Gets the number of messages in the batch.
Definition event_data_batch.hpp:136
Azure::Core::Amqp::Models::AmqpMessage ToAmqpMessage() const
Serializes the EventDataBatch to a single AmqpMessage to be sent to the EventHubs service.
Definition event_data_batch.cpp:22
_azure_NODISCARD bool TryAdd(std::shared_ptr< Azure::Core::Amqp::Models::AmqpMessage const > const &message)
Attempts to add a raw AMQP message to the data batch.
Definition event_data_batch.hpp:119
std::string GetPartitionId() const
Gets the partition ID for the data batch.
Definition event_data_batch.hpp:100
Represents an event sent to the Azure Event Hubs service.
Definition event_data.hpp:19
EventDataBatchOptions contains optional parameters for the [ProducerClient.CreateEventDataBatch] func...
Definition event_data_batch.hpp:29
Azure::Nullable< std::uint64_t > MaxBytes
MaxBytes overrides the max size (in bytes) for a batch. By default CreateEventDataBatch will use the ...
Definition event_data_batch.hpp:34
std::string PartitionKey
PartitionKey is hashed to calculate the partition assignment.Messages and message batches with the sa...
Definition event_data_batch.hpp:40
std::string PartitionId
PartitionId is the ID of the partition to send these messages to. Note that if you use this option th...
Definition event_data_batch.hpp:45