azure-core-amqp
Loading...
Searching...
No Matches
async_operation_queue.hpp
1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4#pragma once
5
6#include <azure/core/context.hpp>
7#include <azure/core/diagnostics/logger.hpp>
8#include <azure/core/internal/diagnostics/log.hpp>
9
10#include <condition_variable>
11#include <list>
12#include <mutex>
13#include <thread>
14#include <tuple>
15
16namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace _internal {
17
24 template <typename... T> class AsyncOperationQueue final {
25 public:
26 AsyncOperationQueue() = default;
27 ~AsyncOperationQueue() = default;
28
29 AsyncOperationQueue(const AsyncOperationQueue&) = delete;
30 AsyncOperationQueue& operator=(const AsyncOperationQueue&) = delete;
31
32 AsyncOperationQueue(AsyncOperationQueue&&) = default;
33 AsyncOperationQueue& operator=(AsyncOperationQueue&&) = default;
34
35 void CompleteOperation(T... operationParameters)
36 {
37 std::unique_lock<std::mutex> lock(m_operationComplete);
38 m_operationQueue.push_back(std::make_unique<std::tuple<T...>>(
39 std::make_tuple<T...>(std::forward<T>(operationParameters)...)));
40 lock.unlock();
41 m_operationCondition.notify_one();
42 }
43
44 template <class... Poller>
45 std::unique_ptr<std::tuple<T...>> WaitForPolledResult(
46 Context const& context,
47 Poller&... pollers)
48 {
49 do
50 {
51 {
52 std::unique_lock<std::mutex> lock(m_operationComplete);
53 if (!m_operationQueue.empty())
54 {
55 std::unique_ptr<std::tuple<T...>> rv;
56 rv = std::move(m_operationQueue.front());
57 m_operationQueue.pop_front();
58 return rv;
59 }
60 if (context.IsCancelled())
61 {
62 return nullptr;
63 }
64 }
65 std::this_thread::yield();
66
67 // Note: We need to call Poll() *outside* the lock because the poller is going to call the
68 // CompleteOperation function.
69 Poll(pollers...);
70 } while (true);
71 }
72
84 template <class... Poller>
85 std::unique_ptr<std::tuple<T...>> WaitForResult(Context const& context, Poller&... pollers)
86 {
87 // If the queue is not empty, return the first element.
88 do
89 {
90 {
91 std::unique_lock<std::mutex> lock(m_operationComplete);
92
93 if (!m_operationQueue.empty())
94 {
95 std::unique_ptr<std::tuple<T...>> rv;
96 rv = std::move(m_operationQueue.front());
97 m_operationQueue.pop_front();
98 return rv;
99 }
100
101 // There's nothing in the queue, wait until something is put into the queue.
102 // This will block until either something is put into the queue or the context is
103 // cancelled.
104 m_operationCondition.wait_for(
105 lock, std::chrono::milliseconds(100), [this, &context]() -> bool {
106 // If the context is cancelled, we should return immediately.
107 if (context.IsCancelled())
108 {
109 return true;
110 }
111 return !m_operationQueue.empty();
112 });
113
114 if (context.IsCancelled())
115 {
116 return nullptr;
117 }
118 }
119 // Note: We need to call Poll() *outside* the lock because the poller is going to call the
120 // CompleteOperation function.
121 Poll(pollers...);
122 } while (true);
123 }
124
131 std::unique_ptr<std::tuple<T...>> TryWaitForResult()
132 {
133 // If the queue is not empty, return the first element.
134 std::unique_lock<std::mutex> lock(m_operationComplete);
135
136 if (!m_operationQueue.empty())
137 {
138 std::unique_ptr<std::tuple<T...>> rv;
139 rv = std::move(m_operationQueue.front());
140 m_operationQueue.pop_front();
141 return rv;
142 }
143 return nullptr;
144 }
145
146 // Clear any pending elements from the queue. This may be needed because some queued elements
147 // may have ordering dependencies that need to be cleared before the object containing the queue
148 // can be released.
149 void Clear()
150 {
151 std::unique_lock<std::mutex> lock(m_operationComplete);
152 m_operationQueue.clear();
153 }
154
155 private:
156 std::mutex m_operationComplete;
157 std::condition_variable m_operationCondition;
158 std::list<std::unique_ptr<std::tuple<T...>>> m_operationQueue;
159
160 void Poll() {}
161
162 template <class PT, class... Ts> void Poll(PT& first, Ts&... rest)
163 {
164 first.Poll();
165 Poll(rest...);
166 }
167 };
168}}}}} // namespace Azure::Core::Amqp::Common::_internal