azure-messaging-eventhubs
Loading...
Searching...
No Matches
processor.hpp
1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3#pragma once
4
5#include "checkpoint_store.hpp"
6#include "consumer_client.hpp"
7#include "models/processor_load_balancer_models.hpp"
8#include "models/processor_models.hpp"
9#include "processor_partition_client.hpp"
10
11#include <azure/core/amqp/internal/common/async_operation_queue.hpp>
12#include <azure/core/context.hpp>
13
14#include <chrono>
15#include <thread>
16
17#ifdef _azure_TESTING_BUILD_AMQP
18namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
19 class ProcessorTest_LoadBalancing_Test;
20}}}} // namespace Azure::Messaging::EventHubs::Test
21#endif
22namespace Azure { namespace Messaging { namespace EventHubs {
26 struct ProcessorOptions final
27 {
32 Models::ProcessorStrategy LoadBalancingStrategy{
33 Models::ProcessorStrategy::ProcessorStrategyBalanced};
34
38 Azure::DateTime::duration UpdateInterval{std::chrono::seconds(10)};
39
43 Azure::DateTime::duration PartitionExpirationDuration{std::chrono::seconds(60)};
44
50
61 int32_t Prefetch{300};
62
70 };
71
77 namespace _detail {
78 class ProcessorLoadBalancer;
79 }
80
84 class Processor final {
85#ifdef _azure_TESTING_BUILD_AMQP
86 friend class Test::ProcessorTest_LoadBalancing_Test;
87#endif
88
89 public:
97 std::shared_ptr<ConsumerClient> consumerClient,
98 std::shared_ptr<CheckpointStore> checkpointStore,
99 ProcessorOptions const& options = {});
100
101 ~Processor();
102
104 Processor(Processor const& other) = delete;
105
107 Processor& operator=(Processor const& other) = delete;
108
118 std::shared_ptr<ProcessorPartitionClient> NextPartitionClient(
119 Azure::Core::Context const& context = {});
120
129 void Run(Core::Context const& context);
130
138 void Start(Azure::Core::Context const& context = {});
139
145 void Stop();
146
150 void Close(Core::Context const& context = {})
151 {
152 if (m_isRunning)
153 {
154 throw std::runtime_error("cannot close a processor that is running");
155 }
156
157 // Drain the partition clients queue.
158 for (;;)
159 {
160 auto client = m_nextPartitionClients.TryRemove();
161 if (client)
162 {
163 client->Close(context);
164 }
165 else
166 {
167 break;
168 }
169 }
170 (void)context;
171 }
172
173 private:
182 template <class T> class Channel {
183 public:
184 Channel() : m_maximumDepth{0} {}
185
186 ~Channel()
187 {
188 Azure::Core::Diagnostics::_internal::Log::Stream(
189 Azure::Core::Diagnostics::Logger::Level::Verbose)
190 << "~Channel. Currently depth is " << m_channelDepth << " and maximum depth is "
191 << m_maximumDepth;
192 Azure::Core::Diagnostics::_internal::Log::Stream(
193 Azure::Core::Diagnostics::Logger::Level::Verbose)
194 << "Clear channel queue.";
195 while (m_channelDepth > 0)
196 {
197 auto value = m_channelQueue.TryWaitForResult();
198 if (value)
199 {
200 m_channelDepth -= 1;
201 }
202 }
203 }
204
205 // Insert an item into the channel, returning true if successful, false if the channel is
206 // full.
207 bool Insert(T item)
208 {
209 std::lock_guard<std::mutex> lock{m_channelLock};
210 if ((m_maximumDepth != 0) && (m_channelDepth >= m_maximumDepth))
211 {
212 return false;
213 }
214 m_channelQueue.CompleteOperation(item);
215 m_channelDepth += 1;
216 return true;
217 }
218
219 // Remove an item from the channel.
220 T Remove(Azure::Core::Context const& context)
221 {
222 auto value = m_channelQueue.WaitForResult(context);
223 if (!value)
224 {
225 throw Azure::Core::OperationCancelledException("Operation was cancelled.");
226 }
227 std::lock_guard<std::mutex> lock{m_channelLock};
228 m_channelDepth -= 1;
229 return std::get<0>(*value);
230 }
231
242 T TryRemove()
243 {
244 std::lock_guard<std::mutex> lock{m_channelLock};
245 auto value = m_channelQueue.TryWaitForResult();
246 if (value)
247 {
248 m_channelDepth -= 1;
249 return std::get<0>(*value);
250 }
251 return T{};
252 }
253
254 void SetMaximumDepth(size_t maximumDepth)
255 {
256 std::lock_guard<std::mutex> lock{m_channelLock};
257 m_maximumDepth = maximumDepth;
258 }
259
260 private:
261 std::mutex m_channelLock;
262 size_t m_channelDepth{};
263 size_t m_maximumDepth{};
264 Core::Amqp::Common::_internal::AsyncOperationQueue<T> m_channelQueue;
265 };
266
267 Azure::DateTime::duration m_ownershipUpdateInterval;
268 Models::StartPositions m_defaultStartPositions;
269 int32_t m_maximumNumberOfPartitions;
270 std::shared_ptr<CheckpointStore> m_checkpointStore;
271 std::shared_ptr<ConsumerClient> m_consumerClient;
272 int32_t m_prefetch;
273 Channel<std::shared_ptr<ProcessorPartitionClient>> m_nextPartitionClients;
274 Models::ConsumerClientDetails m_consumerClientDetails;
275 std::shared_ptr<_detail::ProcessorLoadBalancer> m_loadBalancer;
276 int64_t m_processorOwnerLevel{0};
277 bool m_isRunning{false};
278 std::thread m_processorThread;
279
280 typedef std::map<std::string, std::shared_ptr<ProcessorPartitionClient>> ConsumersType;
281
288 void Dispatch(
289 Models::EventHubProperties const& eventHubProperties,
290 std::shared_ptr<ConsumersType> consumers,
291 Core::Context const& context);
292
293 void AddPartitionClient(
294 Models::Ownership const& ownership,
295 std::map<std::string, Models::Checkpoint>& checkpoints,
296 std::weak_ptr<ConsumersType> consumers,
297 Core::Context const& context);
298
299 void RunInternal(Core::Context const& context, bool manualRun);
300
301 Models::StartPosition GetStartPosition(
302 Models::Ownership const& ownership,
303 std::map<std::string, Models::Checkpoint> const& checkpoints)
304 {
305 Models::StartPosition startPosition = m_defaultStartPositions.Default;
306
307 if (checkpoints.find(ownership.PartitionId) != checkpoints.end())
308 {
309 Models::Checkpoint checkpoint = checkpoints.at(ownership.PartitionId);
310
311 if (checkpoint.Offset.HasValue())
312 {
313 startPosition.Offset = checkpoint.Offset;
314 }
315 else if (checkpoint.SequenceNumber.HasValue())
316 {
317 startPosition.SequenceNumber = checkpoint.SequenceNumber;
318 }
319 else
320 {
321 throw std::runtime_error(
322 "invalid checkpoint" + ownership.PartitionId + "no offset or sequence number");
323 }
324 }
325 else if (
326 m_defaultStartPositions.PerPartition.find(ownership.PartitionId)
327 != m_defaultStartPositions.PerPartition.end())
328 {
329 startPosition = m_defaultStartPositions.PerPartition.at(ownership.PartitionId);
330 }
331 return startPosition;
332 }
333
334 std::map<std::string, Models::Checkpoint> GetCheckpointsMap(Core::Context const& context);
335 };
336}}} // namespace Azure::Messaging::EventHubs
Processor uses a ConsumerClient and CheckpointStore to provide automatic load balancing between multi...
Definition processor.hpp:84
Processor(Processor const &other)=delete
void Stop()
Stops a running processor.
Definition processor.cpp:68
std::shared_ptr< ProcessorPartitionClient > NextPartitionClient(Azure::Core::Context const &context={})
Definition processor.cpp:207
Processor & operator=(Processor const &other)=delete
void Run(Core::Context const &context)
Executes the processor.
Definition processor.cpp:79
void Close(Core::Context const &context={})
Closes the processor and cancels any current operations.
Definition processor.hpp:150
void Start(Azure::Core::Context const &context={})
Starts the processor.
Definition processor.cpp:51
StartPositions are used if there is no checkpoint for a partition in the checkpoint store.
Definition processor_models.hpp:14
ProcessorOptions are the options for the CreateProcessor function.
Definition processor.hpp:27
Azure::DateTime::duration UpdateInterval
UpdateInterval controls how often attempt to claim partitions. The default value is 10 seconds.
Definition processor.hpp:38
int32_t MaximumNumberOfPartitions
Specifies the maximum number of partitions to process.
Definition processor.hpp:69
Models::StartPositions StartPositions
StartPositions are the default start positions (configurable per partition, or with an overall defaul...
Definition processor.hpp:49
Azure::DateTime::duration PartitionExpirationDuration
PartitionExpirationDuration is the amount of time before a partition is considered unowned....
Definition processor.hpp:43
int32_t Prefetch
Prefetch represents the size of the internal prefetch buffer for each ProcessorPartitionClient create...
Definition processor.hpp:61
Models::ProcessorStrategy LoadBalancingStrategy
LoadBalancingStrategy dictates how concurrent Processor instances distribute ownership of partitions ...
Definition processor.hpp:32