5#include "checkpoint_store.hpp"
6#include "consumer_client.hpp"
7#include "models/processor_load_balancer_models.hpp"
8#include "models/processor_models.hpp"
9#include "processor_partition_client.hpp"
11#include <azure/core/amqp/internal/common/async_operation_queue.hpp>
12#include <azure/core/context.hpp>
17#ifdef _azure_TESTING_BUILD_AMQP
18namespace Azure {
namespace Messaging {
namespace EventHubs {
namespace Test {
19 class ProcessorTest_LoadBalancing_Test;
22namespace Azure {
namespace Messaging {
namespace EventHubs {
33 Models::ProcessorStrategy::ProcessorStrategyBalanced};
78 class ProcessorLoadBalancer;
85#ifdef _azure_TESTING_BUILD_AMQP
86 friend class Test::ProcessorTest_LoadBalancing_Test;
97 std::shared_ptr<ConsumerClient> consumerClient,
98 std::shared_ptr<CheckpointStore> checkpointStore,
119 Azure::Core::Context
const& context = {});
129 void Run(Core::Context
const& context);
138 void Start(Azure::Core::Context
const& context = {});
150 void Close(Core::Context
const& context = {})
154 throw std::runtime_error(
"cannot close a processor that is running");
160 auto client = m_nextPartitionClients.TryRemove();
163 client->Close(context);
182 template <
class T>
class Channel {
184 Channel() : m_maximumDepth{0} {}
188 Azure::Core::Diagnostics::_internal::Log::Stream(
189 Azure::Core::Diagnostics::Logger::Level::Verbose)
190 <<
"~Channel. Currently depth is " << m_channelDepth <<
" and maximum depth is "
192 Azure::Core::Diagnostics::_internal::Log::Stream(
193 Azure::Core::Diagnostics::Logger::Level::Verbose)
194 <<
"Clear channel queue.";
195 while (m_channelDepth > 0)
197 auto value = m_channelQueue.TryWaitForResult();
209 std::lock_guard<std::mutex> lock{m_channelLock};
210 if ((m_maximumDepth != 0) && (m_channelDepth >= m_maximumDepth))
214 m_channelQueue.CompleteOperation(item);
220 T Remove(Azure::Core::Context
const& context)
222 auto value = m_channelQueue.WaitForResult(context);
225 throw Azure::Core::OperationCancelledException(
"Operation was cancelled.");
227 std::lock_guard<std::mutex> lock{m_channelLock};
229 return std::get<0>(*value);
244 std::lock_guard<std::mutex> lock{m_channelLock};
245 auto value = m_channelQueue.TryWaitForResult();
249 return std::get<0>(*value);
254 void SetMaximumDepth(
size_t maximumDepth)
256 std::lock_guard<std::mutex> lock{m_channelLock};
257 m_maximumDepth = maximumDepth;
261 std::mutex m_channelLock;
262 size_t m_channelDepth{};
263 size_t m_maximumDepth{};
264 Core::Amqp::Common::_internal::AsyncOperationQueue<T> m_channelQueue;
267 Azure::DateTime::duration m_ownershipUpdateInterval;
268 Models::StartPositions m_defaultStartPositions;
269 int32_t m_maximumNumberOfPartitions;
270 std::shared_ptr<CheckpointStore> m_checkpointStore;
271 std::shared_ptr<ConsumerClient> m_consumerClient;
273 Channel<std::shared_ptr<ProcessorPartitionClient>> m_nextPartitionClients;
274 Models::ConsumerClientDetails m_consumerClientDetails;
275 std::shared_ptr<_detail::ProcessorLoadBalancer> m_loadBalancer;
276 int64_t m_processorOwnerLevel{0};
277 bool m_isRunning{
false};
278 std::thread m_processorThread;
280 typedef std::map<std::string, std::shared_ptr<ProcessorPartitionClient>> ConsumersType;
289 Models::EventHubProperties
const& eventHubProperties,
290 std::shared_ptr<ConsumersType> consumers,
291 Core::Context
const& context);
293 void AddPartitionClient(
294 Models::Ownership
const& ownership,
295 std::map<std::string, Models::Checkpoint>& checkpoints,
296 std::weak_ptr<ConsumersType> consumers,
297 Core::Context
const& context);
299 void RunInternal(Core::Context
const& context,
bool manualRun);
301 Models::StartPosition GetStartPosition(
302 Models::Ownership
const& ownership,
303 std::map<std::string, Models::Checkpoint>
const& checkpoints)
305 Models::StartPosition startPosition = m_defaultStartPositions.Default;
307 if (checkpoints.find(ownership.PartitionId) != checkpoints.end())
309 Models::Checkpoint checkpoint = checkpoints.at(ownership.PartitionId);
311 if (checkpoint.Offset.HasValue())
313 startPosition.Offset = checkpoint.Offset;
315 else if (checkpoint.SequenceNumber.HasValue())
317 startPosition.SequenceNumber = checkpoint.SequenceNumber;
321 throw std::runtime_error(
322 "invalid checkpoint" + ownership.PartitionId +
"no offset or sequence number");
326 m_defaultStartPositions.PerPartition.find(ownership.PartitionId)
327 != m_defaultStartPositions.PerPartition.end())
329 startPosition = m_defaultStartPositions.PerPartition.at(ownership.PartitionId);
331 return startPosition;
334 std::map<std::string, Models::Checkpoint> GetCheckpointsMap(Core::Context
const& context);
Processor uses a ConsumerClient and CheckpointStore to provide automatic load balancing between multi...
Definition processor.hpp:84
Processor(Processor const &other)=delete
void Stop()
Stops a running processor.
Definition processor.cpp:68
std::shared_ptr< ProcessorPartitionClient > NextPartitionClient(Azure::Core::Context const &context={})
Definition processor.cpp:207
Processor & operator=(Processor const &other)=delete
void Run(Core::Context const &context)
Executes the processor.
Definition processor.cpp:79
void Close(Core::Context const &context={})
Closes the processor and cancels any current operations.
Definition processor.hpp:150
void Start(Azure::Core::Context const &context={})
Starts the processor.
Definition processor.cpp:51
StartPositions are used if there is no checkpoint for a partition in the checkpoint store.
Definition processor_models.hpp:14
ProcessorOptions are the options for the CreateProcessor function.
Definition processor.hpp:27
Azure::DateTime::duration UpdateInterval
UpdateInterval controls how often attempt to claim partitions. The default value is 10 seconds.
Definition processor.hpp:38
int32_t MaximumNumberOfPartitions
Specifies the maximum number of partitions to process.
Definition processor.hpp:69
Models::StartPositions StartPositions
StartPositions are the default start positions (configurable per partition, or with an overall defaul...
Definition processor.hpp:49
Azure::DateTime::duration PartitionExpirationDuration
PartitionExpirationDuration is the amount of time before a partition is considered unowned....
Definition processor.hpp:43
int32_t Prefetch
Prefetch represents the size of the internal prefetch buffer for each ProcessorPartitionClient create...
Definition processor.hpp:61
Models::ProcessorStrategy LoadBalancingStrategy
LoadBalancingStrategy dictates how concurrent Processor instances distribute ownership of partitions ...
Definition processor.hpp:32