azure-messaging-eventhubs
Loading...
Searching...
No Matches
producer_client.hpp
1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4// cspell: words myeventhub
5
6#pragma once
7#include "event_data_batch.hpp"
8#include "models/management_models.hpp"
9
10#include <azure/core/amqp.hpp>
11#include <azure/core/amqp/internal/message_sender.hpp>
12#include <azure/core/context.hpp>
13#include <azure/core/credentials/credentials.hpp>
14#include <azure/core/http/policies/policy.hpp>
15
16#include <iostream>
17
18namespace Azure { namespace Messaging { namespace EventHubs {
19 namespace _detail {
20 class EventHubsPropertiesClient;
21 } // namespace _detail
22
26 {
29 std::string ApplicationID = "";
30
34 Azure::Core::Http::Policies::RetryOptions RetryOptions{};
35
38 std::string Name{};
39
42 Azure::Nullable<std::uint64_t> MaxMessageSize{};
43 };
44
47 class ProducerClient final {
48
49 public:
51 std::string const& GetEventHubName() { return m_eventHub; }
52
54 Azure::Core::Http::Policies::RetryOptions const& GetRetryOptions() const
55 {
56 return m_producerClientOptions.RetryOptions;
57 }
58
60 ProducerClient(ProducerClient const& other) = delete;
61
63 ProducerClient& operator=(ProducerClient const& other) = delete;
64
65 ProducerClient(ProducerClient&& other) = delete;
66 ProducerClient& operator=(ProducerClient&& other) = delete;
67
69 ProducerClient() = default;
70
78 std::string const& connectionString,
79 std::string const& eventHub,
80 ProducerClientOptions options = {});
81
90 std::string const& fullyQualifiedNamespace,
91 std::string const& eventHub,
92 std::shared_ptr<Azure::Core::Credentials::TokenCredential> credential,
93 ProducerClientOptions options = {});
94
95 ~ProducerClient() { Close(); }
96
101 void Close(Azure::Core::Context const& context = {})
102 {
103 for (auto& sender : m_senders)
104 {
105 sender.second.Close(context);
106 }
107 m_senders.clear();
108
109 // Close needs to tear down all outstanding sessions and connections, but the functionality to
110 // tear these down isn't complete yet.
111 // for (auto& session : m_sessions)
112 // {
113 // session.second.Close(context);
114 // }
115 // for (auto& connection : m_connections)
116 // {
117 // connection.second.Close(context);
118 // }
119 }
120
128 EventDataBatch CreateBatch(
129 EventDataBatchOptions const& options = {},
130 Azure::Core::Context const& context = {});
131
137 void Send(EventDataBatch const& eventDataBatch, Core::Context const& context = {});
138
147 void Send(Models::EventData const& eventData, Core::Context const& context = {});
148
157 void Send(std::vector<Models::EventData> const& eventData, Core::Context const& context = {});
158
164 Models::EventHubProperties GetEventHubProperties(Core::Context const& context = {});
165
173 Models::EventHubPartitionProperties GetPartitionProperties(
174 std::string const& partitionID,
175 Core::Context const& context = {});
176
177 private:
179 std::string m_connectionString;
180
182 std::string m_fullyQualifiedNamespace;
183
185 std::string m_eventHub{};
186
188 std::string m_targetUrl{};
189
190 uint16_t m_targetPort = Azure::Core::Amqp::_internal::AmqpTlsPort;
191
193 std::shared_ptr<Core::Credentials::TokenCredential> m_credential{};
194
195 ProducerClientOptions m_producerClientOptions{};
196
197 // Protects m_senders and m_connection.
198 std::mutex m_sendersLock;
199 std::map<std::string, Azure::Core::Amqp::_internal::Connection> m_connections{};
200 std::map<std::string, Azure::Core::Amqp::_internal::MessageSender> m_senders{};
201
202 std::recursive_mutex m_sessionsLock;
203 std::map<std::string, Azure::Core::Amqp::_internal::Session> m_sessions{};
204
205 std::mutex m_propertiesClientLock;
206 std::shared_ptr<_detail::EventHubsPropertiesClient> m_propertiesClient;
207
208 Azure::Core::Amqp::_internal::Connection CreateConnection() const;
209 Azure::Core::Amqp::_internal::Session CreateSession(std::string const& partitionId);
210
211 // Ensure that the connection for this producer has been established.
212 void EnsureConnection(const std::string& partitionId);
213
214 // Ensure that a session for the specified partition ID has been established.
215 void EnsureSession(std::string const& partitionId);
216
217 // Ensure that a message sender for the specified partition has been created.
218 void EnsureSender(std::string const& partitionId, Azure::Core::Context const& context = {});
219
220 std::shared_ptr<_detail::EventHubsPropertiesClient> GetPropertiesClient();
221
222 Azure::Core::Amqp::_internal::MessageSender GetSender(std::string const& partitionId);
223 Azure::Core::Amqp::_internal::Session GetSession(std::string const& partitionId);
224 };
225}}} // namespace Azure::Messaging::EventHubs
ProducerClient can be used to send events to an Event Hub.
Definition producer_client.hpp:47
Models::EventHubPartitionProperties GetPartitionProperties(std::string const &partitionID, Core::Context const &context={})
GetPartitionProperties gets properties for a specific partition. This includes data like the last enq...
Definition producer_client.cpp:226
Azure::Core::Http::Policies::RetryOptions const & GetRetryOptions() const
Definition producer_client.hpp:54
ProducerClient & operator=(ProducerClient const &other)=delete
ProducerClient(ProducerClient const &other)=delete
std::string const & GetEventHubName()
Definition producer_client.hpp:51
EventDataBatch CreateBatch(EventDataBatchOptions const &options={}, Azure::Core::Context const &context={})
Create a new EventDataBatch to be sent to the Event Hub.
Definition producer_client.cpp:58
void Send(EventDataBatch const &eventDataBatch, Core::Context const &context={})
Send an EventDataBatch to the remote Event Hub.
Definition producer_client.cpp:74
void Close(Azure::Core::Context const &context={})
Close all the connections and sessions.
Definition producer_client.hpp:101
Models::EventHubProperties GetEventHubProperties(Core::Context const &context={})
GetEventHubProperties gets properties of an eventHub. This includes data like name,...
Definition producer_client.cpp:221
Contains options for the ProducerClient creation.
Definition producer_client.hpp:26
Azure::Core::Http::Policies::RetryOptions RetryOptions
RetryOptions controls how often operations are retried from this client and any Receivers and Senders...
Definition producer_client.hpp:34
Azure::Nullable< std::uint64_t > MaxMessageSize
The maximum size of the message that can be sent.
Definition producer_client.hpp:42
std::string Name
The name of the producer client link, used in diagnostics.
Definition producer_client.hpp:38
std::string ApplicationID
Application ID that will be passed to the namespace.
Definition producer_client.hpp:29