5#include "models/event_data.hpp"
7#include <azure/core/amqp/models/amqp_message.hpp>
8#include <azure/core/diagnostics/logger.hpp>
9#include <azure/core/internal/diagnostics/log.hpp>
16namespace Azure {
namespace Messaging {
namespace EventHubs {
namespace _detail {
17 class EventDataBatchFactory;
20namespace Azure {
namespace Messaging {
namespace EventHubs {
56 const std::string anyPartitionId =
"";
59 std::string m_partitionId;
60 std::string m_partitionKey;
61 Azure::Nullable<std::uint64_t> m_maxBytes;
62 std::vector<std::vector<uint8_t>> m_marshalledMessages;
64 const uint32_t BatchedMessageFormat = 0x80013700;
66 Azure::Core::Amqp::Models::AmqpMessage m_batchEnvelope;
76 : m_rwMutex{}, m_partitionId{other.m_partitionId}, m_partitionKey{other.m_partitionKey},
77 m_maxBytes{other.m_maxBytes}, m_marshalledMessages{other.m_marshalledMessages},
78 m_batchEnvelope{other.m_batchEnvelope}, m_currentSize(other.m_currentSize){};
86 m_partitionId = other.m_partitionId;
87 m_partitionKey = other.m_partitionKey;
88 m_maxBytes = other.m_maxBytes;
89 m_marshalledMessages = other.m_marshalledMessages;
90 m_batchEnvelope = other.m_batchEnvelope;
91 m_currentSize = other.m_currentSize;
120 std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage const>
const& message)
122 return TryAddAmqpMessage(message);
138 std::lock_guard<std::mutex> lock(m_rwMutex);
139 return m_marshalledMessages.size();
147 Azure::Core::Amqp::Models::AmqpMessage
ToAmqpMessage()
const;
150 bool TryAddAmqpMessage(
151 std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage const>
const& message);
153 size_t CalculateActualSizeForPayload(std::vector<uint8_t>
const& payload)
155 const size_t vbin8Overhead = 5;
156 const size_t vbin32Overhead = 8;
158 if (payload.size() < 256)
160 return payload.size() + vbin8Overhead;
162 return payload.size() + vbin32Overhead;
165 Azure::Core::Amqp::Models::AmqpMessage CreateBatchEnvelope(
166 std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage const>
const& message)
const
170 Azure::Core::Amqp::Models::AmqpMessage batchEnvelope{*message};
171 batchEnvelope.BodyType = Azure::Core::Amqp::Models::MessageBodyType::None;
172 batchEnvelope.MessageFormat = BatchedMessageFormat;
173 return batchEnvelope;
181 : m_partitionId{options.PartitionId}, m_partitionKey{options.PartitionKey},
182 m_maxBytes{options.MaxBytes}, m_marshalledMessages{}, m_batchEnvelope{}, m_currentSize{0}
184 if (!options.PartitionId.empty() && !options.PartitionKey.empty())
186 throw std::runtime_error(
"Either PartionID or PartitionKey can be set, but not both.");
189 if (options.PartitionId.empty())
191 m_partitionId = anyPartitionId;
195 friend class _detail::EventDataBatchFactory;
EventDataBatch is used to efficiently pack up EventData before sending it to Event Hubs.
Definition event_data_batch.hpp:54
EventDataBatch(EventDataBatch const &other)
Constructs an EventDataBatch from another EventDataBatch.
Definition event_data_batch.hpp:74
uint64_t GetMaxBytes() const
Gets the maximum size of the data batch.
Definition event_data_batch.hpp:111
EventDataBatch & operator=(EventDataBatch const &other)
Definition event_data_batch.hpp:81
std::string GetPartitionKey() const
Gets the partition key for the data batch.
Definition event_data_batch.hpp:105
size_t NumberOfEvents()
Gets the number of messages in the batch.
Definition event_data_batch.hpp:136
Azure::Core::Amqp::Models::AmqpMessage ToAmqpMessage() const
Serializes the EventDataBatch to a single AmqpMessage to be sent to the EventHubs service.
Definition event_data_batch.cpp:22
_azure_NODISCARD bool TryAdd(std::shared_ptr< Azure::Core::Amqp::Models::AmqpMessage const > const &message)
Attempts to add a raw AMQP message to the data batch.
Definition event_data_batch.hpp:119
std::string GetPartitionId() const
Gets the partition ID for the data batch.
Definition event_data_batch.hpp:100
Represents an event sent to the Azure Event Hubs service.
Definition event_data.hpp:19
EventDataBatchOptions contains optional parameters for the [ProducerClient.CreateEventDataBatch] func...
Definition event_data_batch.hpp:29
Azure::Nullable< std::uint64_t > MaxBytes
MaxBytes overrides the max size (in bytes) for a batch. By default CreateEventDataBatch will use the ...
Definition event_data_batch.hpp:34
std::string PartitionKey
PartitionKey is hashed to calculate the partition assignment.Messages and message batches with the sa...
Definition event_data_batch.hpp:40
std::string PartitionId
PartitionId is the ID of the partition to send these messages to. Note that if you use this option th...
Definition event_data_batch.hpp:45