# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import sys
from typing import Any, Callable, Dict, IO, Iterable, Optional, TypeVar, Union, cast, overload
import urllib.parse
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
ResourceNotModifiedError,
map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import HttpResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling
from .. import models as _models
from .._serialization import Serializer
from .._vendor import _convert_request, _format_url_section
if sys.version_info >= (3, 8):
from typing import Literal # pylint: disable=no-name-in-module, ungrouped-imports
else:
from typing_extensions import Literal # type: ignore # pylint: disable=ungrouped-imports
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
def build_list_by_subscription_request(subscription_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url", "/subscriptions/{subscriptionId}/providers/Microsoft.DocumentDB/cassandraClusters"
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_by_resource_group_request(resource_group_name: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_request(resource_group_name: str, cluster_name: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", max_length=100, min_length=1, pattern=r"^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$"
),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_request(
resource_group_name: str, cluster_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", max_length=100, min_length=1, pattern=r"^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$"
),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_update_request(
resource_group_name: str, cluster_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", max_length=100, min_length=1, pattern=r"^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$"
),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_update_request(
resource_group_name: str, cluster_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", max_length=100, min_length=1, pattern=r"^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$"
),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PATCH", url=_url, params=_params, headers=_headers, **kwargs)
def build_invoke_command_request(
resource_group_name: str, cluster_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/invokeCommand",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", max_length=100, min_length=1, pattern=r"^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$"
),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_backups_request(
resource_group_name: str, cluster_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/backups",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", max_length=100, min_length=1, pattern=r"^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$"
),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_backup_request(
resource_group_name: str, cluster_name: str, backup_id: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/backups/{backupId}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", max_length=100, min_length=1, pattern=r"^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$"
),
"backupId": _SERIALIZER.url("backup_id", backup_id, "str", max_length=15, min_length=1, pattern=r"^[0-9]+$"),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_deallocate_request(
resource_group_name: str, cluster_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/deallocate",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", max_length=100, min_length=1, pattern=r"^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$"
),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_start_request(
resource_group_name: str, cluster_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/start",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", max_length=100, min_length=1, pattern=r"^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$"
),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_status_request(
resource_group_name: str, cluster_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", "2022-11-15-preview")
)
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/status",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", max_length=100, min_length=1, pattern=r"^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$"
),
}
_url: str = _format_url_section(_url, **path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
[docs]class CassandraClustersOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.mgmt.cosmosdb.CosmosDBManagementClient`'s
:attr:`cassandra_clusters` attribute.
"""
models = _models
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace
def list_by_subscription(self, **kwargs: Any) -> Iterable["_models.ClusterResource"]:
"""List all managed Cassandra clusters in this subscription.
:keyword callable cls: A custom type or function that will be passed the direct response
:return: An iterator like instance of either ClusterResource or the result of cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.ClusterResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[_models.ListClusters] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_list_by_subscription_request(
subscription_id=self._config.subscription_id,
api_version=api_version,
template_url=self.list_by_subscription.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = self._deserialize("ListClusters", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
list_by_subscription.metadata = {
"url": "/subscriptions/{subscriptionId}/providers/Microsoft.DocumentDB/cassandraClusters"
}
[docs] @distributed_trace
def list_by_resource_group(self, resource_group_name: str, **kwargs: Any) -> Iterable["_models.ClusterResource"]:
"""List all managed Cassandra clusters in this resource group.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:return: An iterator like instance of either ClusterResource or the result of cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.ClusterResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[_models.ListClusters] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_list_by_resource_group_request(
resource_group_name=resource_group_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
template_url=self.list_by_resource_group.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = self._deserialize("ListClusters", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
list_by_resource_group.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters"
}
[docs] @distributed_trace
def get(self, resource_group_name: str, cluster_name: str, **kwargs: Any) -> _models.ClusterResource:
"""Get the properties of a managed Cassandra cluster.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:return: ClusterResource or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.ClusterResource
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[_models.ClusterResource] = kwargs.pop("cls", None)
request = build_get_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
template_url=self.get.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("ClusterResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"
}
def _delete_initial( # pylint: disable=inconsistent-return-statements
self, resource_group_name: str, cluster_name: str, **kwargs: Any
) -> None:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[None] = kwargs.pop("cls", None)
request = build_delete_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
template_url=self._delete_initial.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
if cls:
return cls(pipeline_response, None, {})
_delete_initial.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"
}
[docs] @distributed_trace
def begin_delete(self, resource_group_name: str, cluster_name: str, **kwargs: Any) -> LROPoller[None]:
"""Deletes a managed Cassandra cluster.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_initial( # type: ignore
resource_group_name=resource_group_name,
cluster_name=cluster_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {})
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
begin_delete.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"
}
def _create_update_initial(
self, resource_group_name: str, cluster_name: str, body: Union[_models.ClusterResource, IO], **kwargs: Any
) -> _models.ClusterResource:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ClusterResource] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(body, (IO, bytes)):
_content = body
else:
_json = self._serialize.body(body, "ClusterResource")
request = build_create_update_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
template_url=self._create_update_initial.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
if response.status_code == 200:
deserialized = self._deserialize("ClusterResource", pipeline_response)
if response.status_code == 201:
deserialized = self._deserialize("ClusterResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
_create_update_initial.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"
}
@overload
def begin_create_update(
self,
resource_group_name: str,
cluster_name: str,
body: _models.ClusterResource,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ClusterResource]:
"""Create or update a managed Cassandra cluster. When updating, you must specify all writable
properties. To update only some properties, use PATCH.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:param body: The properties specifying the desired state of the managed Cassandra cluster.
Required.
:type body: ~azure.mgmt.cosmosdb.models.ClusterResource
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either ClusterResource or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ClusterResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_update(
self,
resource_group_name: str,
cluster_name: str,
body: IO,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ClusterResource]:
"""Create or update a managed Cassandra cluster. When updating, you must specify all writable
properties. To update only some properties, use PATCH.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:param body: The properties specifying the desired state of the managed Cassandra cluster.
Required.
:type body: IO
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either ClusterResource or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ClusterResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def begin_create_update(
self, resource_group_name: str, cluster_name: str, body: Union[_models.ClusterResource, IO], **kwargs: Any
) -> LROPoller[_models.ClusterResource]:
"""Create or update a managed Cassandra cluster. When updating, you must specify all writable
properties. To update only some properties, use PATCH.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:param body: The properties specifying the desired state of the managed Cassandra cluster. Is
either a ClusterResource type or a IO type. Required.
:type body: ~azure.mgmt.cosmosdb.models.ClusterResource or IO
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either ClusterResource or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ClusterResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ClusterResource] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_update_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
body=body,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ClusterResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
begin_create_update.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"
}
def _update_initial(
self, resource_group_name: str, cluster_name: str, body: Union[_models.ClusterResource, IO], **kwargs: Any
) -> _models.ClusterResource:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ClusterResource] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(body, (IO, bytes)):
_content = body
else:
_json = self._serialize.body(body, "ClusterResource")
request = build_update_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
template_url=self._update_initial.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
if response.status_code == 200:
deserialized = self._deserialize("ClusterResource", pipeline_response)
if response.status_code == 202:
deserialized = self._deserialize("ClusterResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
_update_initial.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"
}
@overload
def begin_update(
self,
resource_group_name: str,
cluster_name: str,
body: _models.ClusterResource,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ClusterResource]:
"""Updates some of the properties of a managed Cassandra cluster.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:param body: Parameters to provide for specifying the managed Cassandra cluster. Required.
:type body: ~azure.mgmt.cosmosdb.models.ClusterResource
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either ClusterResource or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ClusterResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_update(
self,
resource_group_name: str,
cluster_name: str,
body: IO,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ClusterResource]:
"""Updates some of the properties of a managed Cassandra cluster.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:param body: Parameters to provide for specifying the managed Cassandra cluster. Required.
:type body: IO
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either ClusterResource or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ClusterResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def begin_update(
self, resource_group_name: str, cluster_name: str, body: Union[_models.ClusterResource, IO], **kwargs: Any
) -> LROPoller[_models.ClusterResource]:
"""Updates some of the properties of a managed Cassandra cluster.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:param body: Parameters to provide for specifying the managed Cassandra cluster. Is either a
ClusterResource type or a IO type. Required.
:type body: ~azure.mgmt.cosmosdb.models.ClusterResource or IO
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either ClusterResource or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ClusterResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ClusterResource] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._update_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
body=body,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ClusterResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
begin_update.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"
}
def _invoke_command_initial(
self, resource_group_name: str, cluster_name: str, body: Union[_models.CommandPostBody, IO], **kwargs: Any
) -> _models.CommandOutput:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.CommandOutput] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(body, (IO, bytes)):
_content = body
else:
_json = self._serialize.body(body, "CommandPostBody")
request = build_invoke_command_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
template_url=self._invoke_command_initial.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("CommandOutput", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
_invoke_command_initial.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/invokeCommand"
}
@overload
def begin_invoke_command(
self,
resource_group_name: str,
cluster_name: str,
body: _models.CommandPostBody,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.CommandOutput]:
"""Invoke a command like nodetool for cassandra maintenance.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:param body: Specification which command to run where. Required.
:type body: ~azure.mgmt.cosmosdb.models.CommandPostBody
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either CommandOutput or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.CommandOutput]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_invoke_command(
self,
resource_group_name: str,
cluster_name: str,
body: IO,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.CommandOutput]:
"""Invoke a command like nodetool for cassandra maintenance.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:param body: Specification which command to run where. Required.
:type body: IO
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either CommandOutput or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.CommandOutput]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def begin_invoke_command(
self, resource_group_name: str, cluster_name: str, body: Union[_models.CommandPostBody, IO], **kwargs: Any
) -> LROPoller[_models.CommandOutput]:
"""Invoke a command like nodetool for cassandra maintenance.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:param body: Specification which command to run where. Is either a CommandPostBody type or a IO
type. Required.
:type body: ~azure.mgmt.cosmosdb.models.CommandPostBody or IO
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either CommandOutput or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.CommandOutput]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.CommandOutput] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._invoke_command_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
body=body,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("CommandOutput", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
begin_invoke_command.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/invokeCommand"
}
[docs] @distributed_trace
def list_backups(
self, resource_group_name: str, cluster_name: str, **kwargs: Any
) -> Iterable["_models.BackupResource"]:
"""List the backups of this cluster that are available to restore.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:return: An iterator like instance of either BackupResource or the result of cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.BackupResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[_models.ListBackups] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_list_backups_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
template_url=self.list_backups.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
request.method = "GET"
return request
def extract_data(pipeline_response):
deserialized = self._deserialize("ListBackups", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
list_backups.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/backups"
}
[docs] @distributed_trace
def get_backup(
self, resource_group_name: str, cluster_name: str, backup_id: str, **kwargs: Any
) -> _models.BackupResource:
"""Get the properties of an individual backup of this cluster that is available to restore.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:param backup_id: Id of a restorable backup of a Cassandra cluster. Required.
:type backup_id: str
:keyword callable cls: A custom type or function that will be passed the direct response
:return: BackupResource or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.BackupResource
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[_models.BackupResource] = kwargs.pop("cls", None)
request = build_get_backup_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
backup_id=backup_id,
subscription_id=self._config.subscription_id,
api_version=api_version,
template_url=self.get_backup.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("BackupResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get_backup.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/backups/{backupId}"
}
def _deallocate_initial( # pylint: disable=inconsistent-return-statements
self, resource_group_name: str, cluster_name: str, **kwargs: Any
) -> None:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[None] = kwargs.pop("cls", None)
request = build_deallocate_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
template_url=self._deallocate_initial.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
if cls:
return cls(pipeline_response, None, {})
_deallocate_initial.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/deallocate"
}
[docs] @distributed_trace
def begin_deallocate(self, resource_group_name: str, cluster_name: str, **kwargs: Any) -> LROPoller[None]:
"""Deallocate the Managed Cassandra Cluster and Associated Data Centers. Deallocation will
deallocate the host virtual machine of this cluster, and reserved the data disk. This won't do
anything on an already deallocated cluster. Use Start to restart the cluster.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._deallocate_initial( # type: ignore
resource_group_name=resource_group_name,
cluster_name=cluster_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {})
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
begin_deallocate.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/deallocate"
}
def _start_initial( # pylint: disable=inconsistent-return-statements
self, resource_group_name: str, cluster_name: str, **kwargs: Any
) -> None:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[None] = kwargs.pop("cls", None)
request = build_start_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
template_url=self._start_initial.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
if cls:
return cls(pipeline_response, None, {})
_start_initial.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/start"
}
[docs] @distributed_trace
def begin_start(self, resource_group_name: str, cluster_name: str, **kwargs: Any) -> LROPoller[None]:
"""Start the Managed Cassandra Cluster and Associated Data Centers. Start will start the host
virtual machine of this cluster with reserved data disk. This won't do anything on an already
running cluster. Use Deallocate to deallocate the cluster.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be ARMPolling. Pass in False for this
operation to not poll, or pass in your own initialized polling object for a personal polling
strategy.
:paramtype polling: bool or ~azure.core.polling.PollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._start_initial( # type: ignore
resource_group_name=resource_group_name,
cluster_name=cluster_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {})
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
begin_start.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/start"
}
[docs] @distributed_trace
def status(
self, resource_group_name: str, cluster_name: str, **kwargs: Any
) -> _models.CassandraClusterPublicStatus:
"""Gets the CPU, memory, and disk usage statistics for each Cassandra node in a cluster.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: Managed Cassandra cluster name. Required.
:type cluster_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:return: CassandraClusterPublicStatus or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.CassandraClusterPublicStatus
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: Literal["2022-11-15-preview"] = kwargs.pop(
"api_version", _params.pop("api-version", self._config.api_version)
)
cls: ClsType[_models.CassandraClusterPublicStatus] = kwargs.pop("cls", None)
request = build_status_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
template_url=self.status.metadata["url"],
headers=_headers,
params=_params,
)
request = _convert_request(request)
request.url = self._client.format_url(request.url)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("CassandraClusterPublicStatus", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
status.metadata = {
"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/status"
}