azure-storage-blobs
Public Member Functions | Static Public Member Functions | Protected Attributes | List of all members
Azure::Storage::Blobs::BlobBatchClient Class Reference

The BlobBatchClient allows you to batch multiple Azure Storage operations in a single request. More...

#include <blob_batch_client.hpp>

Public Member Functions

 BlobBatchClient (const std::string &serviceUri, std::shared_ptr< SharedKeyCredential > credential, const BlobBatchClientOptions &options=BlobBatchClientOptions())
 Initialize a new instance of BlobBatchClient. More...
 
 BlobBatchClient (const std::string &serviceUri, std::shared_ptr< Core::Credentials::TokenCredential > credential, const BlobBatchClientOptions &options=BlobBatchClientOptions())
 Initialize a new instance of BlobBatchClient. More...
 
 BlobBatchClient (const std::string &serviceUri, const BlobBatchClientOptions &options=BlobBatchClientOptions())
 Initialize a new instance of BlobBatchClient. More...
 
Azure::Core::Response< SubmitBlobBatchResultSubmitBatch (const BlobBatch &batch, const SubmitBlobBatchOptions &options=SubmitBlobBatchOptions()) const
 Submit a BlobBatch of sub-operations. More...
 

Static Public Member Functions

static BlobBatchClient CreateFromConnectionString (const std::string &connectionString, const BlobBatchClientOptions &options=BlobBatchClientOptions())
 Initialize a new instance of BlobBatchClient. More...
 
static BlobBatch CreateBatch ()
 Creates a new BlobBatch to collect sub-operations that can be submitted together via SubmitBatch. More...
 

Protected Attributes

Azure::Core::Http::Url m_serviceUrl
 
std::shared_ptr< Azure::Core::Http::HttpPipeline > m_pipeline
 
std::shared_ptr< Azure::Core::Http::HttpPipeline > m_subRequestPipeline
 

Detailed Description

The BlobBatchClient allows you to batch multiple Azure Storage operations in a single request.

Constructor & Destructor Documentation

◆ BlobBatchClient() [1/3]

Azure::Storage::Blobs::BlobBatchClient::BlobBatchClient ( const std::string &  serviceUri,
std::shared_ptr< SharedKeyCredential >  credential,
const BlobBatchClientOptions options = BlobBatchClientOptions() 
)
explicit

Initialize a new instance of BlobBatchClient.

Parameters
serviceUriA uri referencing the blob that includes the name of the account.
credentialThe shared key credential used to sign requests.
optionsOptional client options that define the transport pipeline policies for authentication, retries, etc., that are applied to every request and subrequest.
90  : m_serviceUrl(serviceUri)
91  {
92  std::vector<std::unique_ptr<Azure::Core::Http::HttpPolicy>> policies;
93  policies.emplace_back(std::make_unique<Azure::Core::Http::TelemetryPolicy>(
94  Details::c_BlobServicePackageName, BlobServiceVersion));
95  policies.emplace_back(std::make_unique<Azure::Core::Http::RequestIdPolicy>());
96  for (const auto& p : options.PerOperationPolicies)
97  {
98  policies.emplace_back(p->Clone());
99  }
100  policies.emplace_back(
101  std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
102  for (const auto& p : options.PerRetryPolicies)
103  {
104  policies.emplace_back(p->Clone());
105  }
106  policies.emplace_back(std::make_unique<StoragePerRetryPolicy>());
107  policies.emplace_back(std::make_unique<SharedKeyPolicy>(credential));
108  policies.emplace_back(std::make_unique<Azure::Core::Http::TransportPolicy>(
109  std::make_shared<Azure::Core::Http::CurlTransport>()));
110  m_pipeline = std::make_shared<Azure::Core::Http::HttpPipeline>(policies);
111 
112  policies.clear();
113  for (const auto& p : options.PerOperationPolicies)
114  {
115  policies.emplace_back(p->Clone());
116  }
117  for (const auto& p : options.PerRetryPolicies)
118  {
119  policies.emplace_back(p->Clone());
120  }
121  policies.emplace_back(std::make_unique<StoragePerRetryPolicy>());
122  policies.emplace_back(std::make_unique<SharedKeyPolicy>(credential));
123  policies.emplace_back(std::make_unique<NoopTransportPolicy>());
124  m_subRequestPipeline = std::make_shared<Azure::Core::Http::HttpPipeline>(policies);
125  }

◆ BlobBatchClient() [2/3]

Azure::Storage::Blobs::BlobBatchClient::BlobBatchClient ( const std::string &  serviceUri,
std::shared_ptr< Core::Credentials::TokenCredential >  credential,
const BlobBatchClientOptions options = BlobBatchClientOptions() 
)
explicit

Initialize a new instance of BlobBatchClient.

Parameters
serviceUriA uri referencing the blob that includes the name of the account.
credentialThe token credential used to sign requests.
optionsOptional client options that define the transport pipeline policies for authentication, retries, etc., that are applied to every request and subrequest.
131  : m_serviceUrl(serviceUri)
132  {
133  std::vector<std::unique_ptr<Azure::Core::Http::HttpPolicy>> policies;
134  policies.emplace_back(std::make_unique<Azure::Core::Http::TelemetryPolicy>(
135  Details::c_BlobServicePackageName, BlobServiceVersion));
136  policies.emplace_back(std::make_unique<Azure::Core::Http::RequestIdPolicy>());
137  for (const auto& p : options.PerOperationPolicies)
138  {
139  policies.emplace_back(p->Clone());
140  }
141  policies.emplace_back(
142  std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
143  for (const auto& p : options.PerRetryPolicies)
144  {
145  policies.emplace_back(p->Clone());
146  }
147  policies.emplace_back(std::make_unique<StoragePerRetryPolicy>());
148  policies.emplace_back(
149  std::make_unique<Core::Credentials::Policy::BearerTokenAuthenticationPolicy>(
150  credential, Details::c_StorageScope));
151  policies.emplace_back(std::make_unique<Azure::Core::Http::TransportPolicy>(
152  std::make_shared<Azure::Core::Http::CurlTransport>()));
153  m_pipeline = std::make_shared<Azure::Core::Http::HttpPipeline>(policies);
154 
155  policies.clear();
156  for (const auto& p : options.PerOperationPolicies)
157  {
158  policies.emplace_back(p->Clone());
159  }
160  for (const auto& p : options.PerRetryPolicies)
161  {
162  policies.emplace_back(p->Clone());
163  }
164  policies.emplace_back(std::make_unique<StoragePerRetryPolicy>());
165  policies.emplace_back(
166  std::make_unique<Core::Credentials::Policy::BearerTokenAuthenticationPolicy>(
167  credential, Details::c_StorageScope));
168  policies.emplace_back(std::make_unique<NoopTransportPolicy>());
169  m_subRequestPipeline = std::make_shared<Azure::Core::Http::HttpPipeline>(policies);
170  }

◆ BlobBatchClient() [3/3]

Azure::Storage::Blobs::BlobBatchClient::BlobBatchClient ( const std::string &  serviceUri,
const BlobBatchClientOptions options = BlobBatchClientOptions() 
)
explicit

Initialize a new instance of BlobBatchClient.

Parameters
serviceUriA uri referencing the blob that includes the name of the account, and possibly also a SAS token.
optionsOptional client options that define the transport pipeline policies for authentication, retries, etc., that are applied to every request and subrequest.
175  : m_serviceUrl(serviceUri)
176  {
177  std::vector<std::unique_ptr<Azure::Core::Http::HttpPolicy>> policies;
178  policies.emplace_back(std::make_unique<Azure::Core::Http::TelemetryPolicy>(
179  Details::c_BlobServicePackageName, BlobServiceVersion));
180  policies.emplace_back(std::make_unique<Azure::Core::Http::RequestIdPolicy>());
181  for (const auto& p : options.PerOperationPolicies)
182  {
183  policies.emplace_back(p->Clone());
184  }
185  policies.emplace_back(
186  std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
187  for (const auto& p : options.PerRetryPolicies)
188  {
189  policies.emplace_back(p->Clone());
190  }
191  policies.emplace_back(std::make_unique<StoragePerRetryPolicy>());
192  policies.emplace_back(std::make_unique<Azure::Core::Http::TransportPolicy>(
193  std::make_shared<Azure::Core::Http::CurlTransport>()));
194  m_pipeline = std::make_shared<Azure::Core::Http::HttpPipeline>(policies);
195 
196  policies.clear();
197  for (const auto& p : options.PerOperationPolicies)
198  {
199  policies.emplace_back(p->Clone());
200  }
201  for (const auto& p : options.PerRetryPolicies)
202  {
203  policies.emplace_back(p->Clone());
204  }
205  policies.emplace_back(std::make_unique<StoragePerRetryPolicy>());
206  policies.emplace_back(std::make_unique<NoopTransportPolicy>());
207  m_subRequestPipeline = std::make_shared<Azure::Core::Http::HttpPipeline>(policies);
208  }

Member Function Documentation

◆ CreateBatch()

static BlobBatch Azure::Storage::Blobs::BlobBatchClient::CreateBatch ( )
inlinestatic

Creates a new BlobBatch to collect sub-operations that can be submitted together via SubmitBatch.

Returns
A new instance of BlobBatch.
145 { return BlobBatch(); }

◆ CreateFromConnectionString()

BlobBatchClient Azure::Storage::Blobs::BlobBatchClient::CreateFromConnectionString ( const std::string &  connectionString,
const BlobBatchClientOptions options = BlobBatchClientOptions() 
)
static

Initialize a new instance of BlobBatchClient.

Parameters
connectionStringA connection string includes the authentication information required for your application to access data in an Azure Storage account at runtime.
optionsOptional client options that define the transport pipeline policies for authentication, retries, etc., that are applied to every request and subrequest.
Returns
A new BlobBatchClient instance.
71  {
72  auto parsedConnectionString = Details::ParseConnectionString(connectionString);
73  auto serviceUri = std::move(parsedConnectionString.BlobServiceUri);
74 
75  if (parsedConnectionString.KeyCredential)
76  {
77  return BlobBatchClient(
78  serviceUri.GetAbsoluteUrl(), parsedConnectionString.KeyCredential, options);
79  }
80  else
81  {
82  return BlobBatchClient(serviceUri.GetAbsoluteUrl(), options);
83  }
84  }

◆ SubmitBatch()

Azure::Core::Response< SubmitBlobBatchResult > Azure::Storage::Blobs::BlobBatchClient::SubmitBatch ( const BlobBatch batch,
const SubmitBlobBatchOptions options = SubmitBlobBatchOptions() 
) const

Submit a BlobBatch of sub-operations.

Parameters
batchA BlobBatch of sub-operations.
optionsOptional parameters to execute this function.
Returns
A SubmitBlobBatchResult on successful submitting.
213  {
214  const std::string c_lineEnding = "\r\n";
215  const std::string c_contentTypePrefix = "multipart/mixed; boundary=";
216 
217  std::string boundary = "batch_" + Azure::Core::Uuid::CreateUuid().GetUuidString();
218 
219  enum class RequestType
220  {
221  DeleteBlob,
222  SetBlobAccessTier,
223  };
224 
225  std::vector<RequestType> requestTypes;
226 
227  std::string requestBody;
228  {
229  auto getBatchBoundary = [&c_lineEnding, &boundary, subRequestCounter = 0]() mutable {
230  std::string ret;
231  ret += "--" + boundary + c_lineEnding;
232  ret += "Content-Type: application/http" + c_lineEnding + "Content-Transfer-Encoding: binary"
233  + c_lineEnding + "Content-ID: " + std::to_string(subRequestCounter++) + c_lineEnding
234  + c_lineEnding;
235  return ret;
236  };
237  for (const auto& subrequest : batch.m_deleteBlobSubRequests)
238  {
239  requestTypes.emplace_back(RequestType::DeleteBlob);
240 
241  requestBody += getBatchBoundary();
242 
243  auto blobUrl = m_serviceUrl;
244  blobUrl.AppendPath(subrequest.ContainerName);
245  blobUrl.AppendPath(subrequest.BlobName);
246  BlobRestClient::Blob::DeleteBlobOptions protocolLayerOptions;
247  protocolLayerOptions.DeleteSnapshots = subrequest.Options.DeleteSnapshots;
248  protocolLayerOptions.IfModifiedSince = subrequest.Options.AccessConditions.IfModifiedSince;
249  protocolLayerOptions.IfUnmodifiedSince
250  = subrequest.Options.AccessConditions.IfUnmodifiedSince;
251  protocolLayerOptions.IfMatch = subrequest.Options.AccessConditions.IfMatch;
252  protocolLayerOptions.IfNoneMatch = subrequest.Options.AccessConditions.IfNoneMatch;
253  protocolLayerOptions.LeaseId = subrequest.Options.AccessConditions.LeaseId;
254  auto message = BlobRestClient::Blob::DeleteCreateMessage(blobUrl, protocolLayerOptions);
255  message.RemoveHeader(Details::c_HttpHeaderXMsVersion);
256  m_subRequestPipeline->Send(options.Context, message);
257  requestBody += message.GetHTTPMessagePreBody();
258  }
259  for (const auto& subrequest : batch.m_setBlobAccessTierSubRequests)
260  {
261  requestTypes.emplace_back(RequestType::SetBlobAccessTier);
262 
263  requestBody += getBatchBoundary();
264 
265  auto blobUrl = m_serviceUrl;
266  blobUrl.AppendPath(subrequest.ContainerName);
267  blobUrl.AppendPath(subrequest.BlobName);
268  BlobRestClient::Blob::SetBlobAccessTierOptions protocolLayerOptions;
269  protocolLayerOptions.Tier = subrequest.Tier;
270  protocolLayerOptions.RehydratePriority = subrequest.Options.RehydratePriority;
271  auto message
272  = BlobRestClient::Blob::SetAccessTierCreateMessage(blobUrl, protocolLayerOptions);
273  message.RemoveHeader(Details::c_HttpHeaderXMsVersion);
274  m_subRequestPipeline->Send(options.Context, message);
275  requestBody += message.GetHTTPMessagePreBody();
276  }
277  requestBody += "--" + boundary + "--" + c_lineEnding;
278  }
279 
280  BlobRestClient::BlobBatch::SubmitBlobBatchOptions protocolLayerOptions;
281  protocolLayerOptions.ContentType = c_contentTypePrefix + boundary;
282 
283  Azure::Core::Http::MemoryBodyStream requestBodyStream(
284  reinterpret_cast<const uint8_t*>(requestBody.data()), requestBody.length());
285 
286  auto rawResponse = BlobRestClient::BlobBatch::SubmitBatch(
287  options.Context, *m_pipeline, m_serviceUrl, &requestBodyStream, protocolLayerOptions);
288 
289  if (rawResponse->ContentType.substr(0, c_contentTypePrefix.length()) == c_contentTypePrefix)
290  {
291  boundary = rawResponse->ContentType.substr(c_contentTypePrefix.length());
292  }
293  else
294  {
295  throw std::runtime_error("failed to parse Content-Type response header");
296  }
297 
298  SubmitBlobBatchResult batchResult;
299  {
300  const std::vector<uint8_t>& responseBody = rawResponse.GetRawResponse().GetBody();
301 
302  const char* const startPos = reinterpret_cast<const char*>(responseBody.data());
303  const char* currPos = startPos;
304  const char* const endPos = currPos + responseBody.size();
305 
306  auto parseLookAhead = [&currPos, endPos](const std::string& expect) -> bool {
307  // This doesn't move currPos
308  for (std::size_t i = 0; i < expect.length(); ++i)
309  {
310  if (currPos + i < endPos && currPos[i] == expect[i])
311  {
312  continue;
313  }
314  return false;
315  }
316  return true;
317  };
318 
319  auto parseConsume = [&currPos, startPos, &parseLookAhead](const std::string& expect) -> void {
320  // This moves currPos
321  if (parseLookAhead(expect))
322  {
323  currPos += expect.length();
324  }
325  else
326  {
327  throw std::runtime_error(
328  "failed to parse response body at " + std::to_string(currPos - startPos));
329  }
330  };
331 
332  auto parseFindNext = [&currPos, endPos](const std::string& expect) -> const char* {
333  // This doesn't move currPos
334  return std::search(currPos, endPos, expect.begin(), expect.end());
335  };
336 
337  auto parseFindNextAfter = [endPos, &parseFindNext](const std::string& expect) -> const char* {
338  // This doesn't move currPos
339  return std::min(endPos, parseFindNext(expect) + expect.length());
340  };
341 
342  auto parseGetUntilAfter
343  = [&currPos, endPos, &parseFindNext](const std::string& expect) -> std::string {
344  // This moves currPos
345  auto ePos = parseFindNext(expect);
346  std::string ret(currPos, ePos);
347  currPos = std::min(endPos, ePos + expect.length());
348  return ret;
349  };
350 
351  int subRequestCounter = 0;
352  while (true)
353  {
354  parseConsume("--" + boundary);
355 
356  if (parseLookAhead("--"))
357  {
358  parseConsume("--");
359  }
360 
361  if (currPos == endPos)
362  {
363  break;
364  }
365 
366  currPos = parseFindNextAfter(c_lineEnding + c_lineEnding);
367  auto boundaryPos = parseFindNext("--" + boundary);
368 
369  // now (currPos, boundaryPos) is a subresponse body
370  parseConsume("HTTP/");
371  int32_t httpMajorVersion = std::stoi(parseGetUntilAfter("."));
372  int32_t httpMinorVersion = std::stoi(parseGetUntilAfter(" "));
373  int32_t httpStatusCode = std::stoi(parseGetUntilAfter(" "));
374  std::string httpReasonPhrase = parseGetUntilAfter(c_lineEnding);
375 
376  auto rawSubresponse = std::make_unique<Azure::Core::Http::RawResponse>(
377  httpMajorVersion,
378  httpMinorVersion,
379  static_cast<Azure::Core::Http::HttpStatusCode>(httpStatusCode),
380  httpReasonPhrase);
381 
382  while (currPos < boundaryPos)
383  {
384  if (parseLookAhead(c_lineEnding))
385  {
386  break;
387  }
388 
389  std::string headerName = parseGetUntilAfter(": ");
390  std::string headerValue = parseGetUntilAfter(c_lineEnding);
391  rawSubresponse->AddHeader(headerName, headerValue);
392  }
393 
394  parseConsume(c_lineEnding);
395 
396  rawSubresponse->SetBody(std::vector<uint8_t>(currPos, boundaryPos));
397  currPos = boundaryPos;
398 
399  RequestType requestType = requestTypes[subRequestCounter++];
400  if (requestType == RequestType::DeleteBlob)
401  {
402  try
403  {
404  batchResult.DeleteBlobResults.emplace_back(BlobRestClient::Blob::DeleteCreateResponse(
405  options.Context, std::move(rawSubresponse)));
406  }
407  catch (StorageError& e)
408  {
409  batchResult.DeleteBlobResults.emplace_back(Azure::Core::Response<DeleteBlobResult>(
410  DeleteBlobResult{}, std::move(e.RawResponse)));
411  }
412  }
413  else if (requestType == RequestType::SetBlobAccessTier)
414  {
415  try
416  {
417  batchResult.SetBlobAccessTierResults.emplace_back(
418  BlobRestClient::Blob::SetAccessTierCreateResponse(
419  options.Context, std::move(rawSubresponse)));
420  }
421  catch (StorageError& e)
422  {
423  batchResult.SetBlobAccessTierResults.emplace_back(
424  Azure::Core::Response<SetBlobAccessTierResult>(
425  SetBlobAccessTierResult{}, std::move(e.RawResponse)));
426  }
427  }
428  }
429  }
430 
431  return Azure::Core::Response<SubmitBlobBatchResult>(
432  std::move(batchResult), rawResponse.ExtractRawResponse());
433  }

The documentation for this class was generated from the following files:
Azure::Storage::Blobs::BlobBatchClient::BlobBatchClient
BlobBatchClient(const std::string &serviceUri, std::shared_ptr< SharedKeyCredential > credential, const BlobBatchClientOptions &options=BlobBatchClientOptions())
Initialize a new instance of BlobBatchClient.
Definition: blob_batch_client.cpp:86