Source code for azure.mgmt.cosmosdb.operations._cassandra_clusters_operations

# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from typing import Any, Callable, Dict, Iterable, Optional, TypeVar, Union

from msrest import Serializer

from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import HttpResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling

from .. import models as _models
from .._vendor import _convert_request, _format_url_section
T = TypeVar('T')
JSONType = Any
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]

_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False

def build_list_by_subscription_request(
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/providers/Microsoft.DocumentDB/cassandraClusters")
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        **kwargs
    )


def build_list_by_resource_group_request(
    subscription_id: str,
    resource_group_name: str,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters")  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, 'str', max_length=90, min_length=1),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        **kwargs
    )


def build_get_request(
    subscription_id: str,
    resource_group_name: str,
    cluster_name: str,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}")  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, 'str', max_length=90, min_length=1),
        "clusterName": _SERIALIZER.url("cluster_name", cluster_name, 'str', max_length=100, min_length=1, pattern=r'^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$'),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        **kwargs
    )


def build_delete_request_initial(
    subscription_id: str,
    resource_group_name: str,
    cluster_name: str,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}")  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, 'str', max_length=90, min_length=1),
        "clusterName": _SERIALIZER.url("cluster_name", cluster_name, 'str', max_length=100, min_length=1, pattern=r'^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$'),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="DELETE",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        **kwargs
    )


def build_create_update_request_initial(
    subscription_id: str,
    resource_group_name: str,
    cluster_name: str,
    *,
    json: JSONType = None,
    content: Any = None,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str
    content_type = kwargs.pop('content_type', None)  # type: Optional[str]

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}")  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, 'str', max_length=90, min_length=1),
        "clusterName": _SERIALIZER.url("cluster_name", cluster_name, 'str', max_length=100, min_length=1, pattern=r'^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$'),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    if content_type is not None:
        _header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="PUT",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        json=json,
        content=content,
        **kwargs
    )


def build_update_request_initial(
    subscription_id: str,
    resource_group_name: str,
    cluster_name: str,
    *,
    json: JSONType = None,
    content: Any = None,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str
    content_type = kwargs.pop('content_type', None)  # type: Optional[str]

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}")  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, 'str', max_length=90, min_length=1),
        "clusterName": _SERIALIZER.url("cluster_name", cluster_name, 'str', max_length=100, min_length=1, pattern=r'^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$'),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    if content_type is not None:
        _header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="PATCH",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        json=json,
        content=content,
        **kwargs
    )


def build_invoke_command_request_initial(
    subscription_id: str,
    resource_group_name: str,
    cluster_name: str,
    *,
    json: JSONType = None,
    content: Any = None,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str
    content_type = kwargs.pop('content_type', None)  # type: Optional[str]

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/invokeCommand")  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, 'str', max_length=90, min_length=1),
        "clusterName": _SERIALIZER.url("cluster_name", cluster_name, 'str', max_length=100, min_length=1, pattern=r'^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$'),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    if content_type is not None:
        _header_parameters['Content-Type'] = _SERIALIZER.header("content_type", content_type, 'str')
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="POST",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        json=json,
        content=content,
        **kwargs
    )


def build_list_backups_request(
    subscription_id: str,
    resource_group_name: str,
    cluster_name: str,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/backups")  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, 'str', max_length=90, min_length=1),
        "clusterName": _SERIALIZER.url("cluster_name", cluster_name, 'str', max_length=100, min_length=1, pattern=r'^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$'),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        **kwargs
    )


def build_get_backup_request(
    subscription_id: str,
    resource_group_name: str,
    cluster_name: str,
    backup_id: str,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/backups/{backupId}")  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, 'str', max_length=90, min_length=1),
        "clusterName": _SERIALIZER.url("cluster_name", cluster_name, 'str', max_length=100, min_length=1, pattern=r'^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$'),
        "backupId": _SERIALIZER.url("backup_id", backup_id, 'str', max_length=15, min_length=1, pattern=r'^[0-9]+$'),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        **kwargs
    )


def build_deallocate_request_initial(
    subscription_id: str,
    resource_group_name: str,
    cluster_name: str,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/deallocate")  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, 'str', max_length=90, min_length=1),
        "clusterName": _SERIALIZER.url("cluster_name", cluster_name, 'str', max_length=100, min_length=1, pattern=r'^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$'),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="POST",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        **kwargs
    )


def build_start_request_initial(
    subscription_id: str,
    resource_group_name: str,
    cluster_name: str,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/start")  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, 'str', max_length=90, min_length=1),
        "clusterName": _SERIALIZER.url("cluster_name", cluster_name, 'str', max_length=100, min_length=1, pattern=r'^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$'),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="POST",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        **kwargs
    )


def build_status_request(
    subscription_id: str,
    resource_group_name: str,
    cluster_name: str,
    **kwargs: Any
) -> HttpRequest:
    api_version = kwargs.pop('api_version', "2022-02-15-preview")  # type: str

    accept = "application/json"
    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/status")  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, 'str', min_length=1),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, 'str', max_length=90, min_length=1),
        "clusterName": _SERIALIZER.url("cluster_name", cluster_name, 'str', max_length=100, min_length=1, pattern=r'^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$'),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _query_parameters = kwargs.pop("params", {})  # type: Dict[str, Any]
    _query_parameters['api-version'] = _SERIALIZER.query("api_version", api_version, 'str')

    # Construct headers
    _header_parameters = kwargs.pop("headers", {})  # type: Dict[str, Any]
    _header_parameters['Accept'] = _SERIALIZER.header("accept", accept, 'str')

    return HttpRequest(
        method="GET",
        url=_url,
        params=_query_parameters,
        headers=_header_parameters,
        **kwargs
    )

[docs]class CassandraClustersOperations(object): """CassandraClustersOperations operations. You should not instantiate this class directly. Instead, you should create a Client instance that instantiates it for you and attaches it as an attribute. :ivar models: Alias to model classes used in this operation group. :type models: ~azure.mgmt.cosmosdb.models :param client: Client for service requests. :param config: Configuration of service client. :param serializer: An object model serializer. :param deserializer: An object model deserializer. """ models = _models def __init__(self, client, config, serializer, deserializer): self._client = client self._serialize = serializer self._deserialize = deserializer self._config = config
[docs] @distributed_trace def list_by_subscription( self, **kwargs: Any ) -> Iterable["_models.ListClusters"]: """List all managed Cassandra clusters in this subscription. :keyword callable cls: A custom type or function that will be passed the direct response :return: An iterator like instance of either ListClusters or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.ListClusters] :raises: ~azure.core.exceptions.HttpResponseError """ api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str cls = kwargs.pop('cls', None) # type: ClsType["_models.ListClusters"] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_list_by_subscription_request( subscription_id=self._config.subscription_id, api_version=api_version, template_url=self.list_by_subscription.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) else: request = build_list_by_subscription_request( subscription_id=self._config.subscription_id, api_version=api_version, template_url=next_link, ) request = _convert_request(request) request.url = self._client.format_url(request.url) request.method = "GET" return request def extract_data(pipeline_response): deserialized = self._deserialize("ListClusters", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) return None, iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) return pipeline_response return ItemPaged( get_next, extract_data )
list_by_subscription.metadata = {'url': "/subscriptions/{subscriptionId}/providers/Microsoft.DocumentDB/cassandraClusters"} # type: ignore
[docs] @distributed_trace def list_by_resource_group( self, resource_group_name: str, **kwargs: Any ) -> Iterable["_models.ListClusters"]: """List all managed Cassandra clusters in this resource group. :param resource_group_name: The name of the resource group. The name is case insensitive. :type resource_group_name: str :keyword callable cls: A custom type or function that will be passed the direct response :return: An iterator like instance of either ListClusters or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.ListClusters] :raises: ~azure.core.exceptions.HttpResponseError """ api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str cls = kwargs.pop('cls', None) # type: ClsType["_models.ListClusters"] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_list_by_resource_group_request( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, api_version=api_version, template_url=self.list_by_resource_group.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) else: request = build_list_by_resource_group_request( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, api_version=api_version, template_url=next_link, ) request = _convert_request(request) request.url = self._client.format_url(request.url) request.method = "GET" return request def extract_data(pipeline_response): deserialized = self._deserialize("ListClusters", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) return None, iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) return pipeline_response return ItemPaged( get_next, extract_data )
list_by_resource_group.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters"} # type: ignore
[docs] @distributed_trace def get( self, resource_group_name: str, cluster_name: str, **kwargs: Any ) -> "_models.ClusterResource": """Get the properties of a managed Cassandra cluster. :param resource_group_name: The name of the resource group. The name is case insensitive. :type resource_group_name: str :param cluster_name: Managed Cassandra cluster name. :type cluster_name: str :keyword callable cls: A custom type or function that will be passed the direct response :return: ClusterResource, or the result of cls(response) :rtype: ~azure.mgmt.cosmosdb.models.ClusterResource :raises: ~azure.core.exceptions.HttpResponseError """ cls = kwargs.pop('cls', None) # type: ClsType["_models.ClusterResource"] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str request = build_get_request( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, template_url=self.get.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = self._deserialize('ClusterResource', pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"} # type: ignore def _delete_initial( # pylint: disable=inconsistent-return-statements self, resource_group_name: str, cluster_name: str, **kwargs: Any ) -> None: cls = kwargs.pop('cls', None) # type: ClsType[None] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str request = build_delete_request_initial( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, template_url=self._delete_initial.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [202, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) if cls: return cls(pipeline_response, None, {}) _delete_initial.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"} # type: ignore
[docs] @distributed_trace def begin_delete( # pylint: disable=inconsistent-return-statements self, resource_group_name: str, cluster_name: str, **kwargs: Any ) -> LROPoller[None]: """Deletes a managed Cassandra cluster. :param resource_group_name: The name of the resource group. The name is case insensitive. :type resource_group_name: str :param cluster_name: Managed Cassandra cluster name. :type cluster_name: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either None or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[None] :raises: ~azure.core.exceptions.HttpResponseError """ api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] cls = kwargs.pop('cls', None) # type: ClsType[None] lro_delay = kwargs.pop( 'polling_interval', self._config.polling_interval ) cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] if cont_token is None: raw_result = self._delete_initial( resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, cls=lambda x,y,z: x, **kwargs ) kwargs.pop('error_map', None) def get_long_running_output(pipeline_response): if cls: return cls(pipeline_response, None, {}) if polling is True: polling_method = ARMPolling(lro_delay, **kwargs) elif polling is False: polling_method = NoPolling() else: polling_method = polling if cont_token: return LROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output ) return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_delete.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"} # type: ignore def _create_update_initial( self, resource_group_name: str, cluster_name: str, body: "_models.ClusterResource", **kwargs: Any ) -> "_models.ClusterResource": cls = kwargs.pop('cls', None) # type: ClsType["_models.ClusterResource"] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] _json = self._serialize.body(body, 'ClusterResource') request = build_create_update_request_initial( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, content_type=content_type, json=_json, template_url=self._create_update_initial.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) if response.status_code == 200: deserialized = self._deserialize('ClusterResource', pipeline_response) if response.status_code == 201: deserialized = self._deserialize('ClusterResource', pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized _create_update_initial.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"} # type: ignore
[docs] @distributed_trace def begin_create_update( self, resource_group_name: str, cluster_name: str, body: "_models.ClusterResource", **kwargs: Any ) -> LROPoller["_models.ClusterResource"]: """Create or update a managed Cassandra cluster. When updating, you must specify all writable properties. To update only some properties, use PATCH. :param resource_group_name: The name of the resource group. The name is case insensitive. :type resource_group_name: str :param cluster_name: Managed Cassandra cluster name. :type cluster_name: str :param body: The properties specifying the desired state of the managed Cassandra cluster. :type body: ~azure.mgmt.cosmosdb.models.ClusterResource :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either ClusterResource or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ClusterResource] :raises: ~azure.core.exceptions.HttpResponseError """ api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] cls = kwargs.pop('cls', None) # type: ClsType["_models.ClusterResource"] lro_delay = kwargs.pop( 'polling_interval', self._config.polling_interval ) cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] if cont_token is None: raw_result = self._create_update_initial( resource_group_name=resource_group_name, cluster_name=cluster_name, body=body, api_version=api_version, content_type=content_type, cls=lambda x,y,z: x, **kwargs ) kwargs.pop('error_map', None) def get_long_running_output(pipeline_response): response = pipeline_response.http_response deserialized = self._deserialize('ClusterResource', pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized if polling is True: polling_method = ARMPolling(lro_delay, **kwargs) elif polling is False: polling_method = NoPolling() else: polling_method = polling if cont_token: return LROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output ) return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_create_update.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"} # type: ignore def _update_initial( self, resource_group_name: str, cluster_name: str, body: "_models.ClusterResource", **kwargs: Any ) -> "_models.ClusterResource": cls = kwargs.pop('cls', None) # type: ClsType["_models.ClusterResource"] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] _json = self._serialize.body(body, 'ClusterResource') request = build_update_request_initial( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, content_type=content_type, json=_json, template_url=self._update_initial.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) if response.status_code == 200: deserialized = self._deserialize('ClusterResource', pipeline_response) if response.status_code == 202: deserialized = self._deserialize('ClusterResource', pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized _update_initial.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"} # type: ignore
[docs] @distributed_trace def begin_update( self, resource_group_name: str, cluster_name: str, body: "_models.ClusterResource", **kwargs: Any ) -> LROPoller["_models.ClusterResource"]: """Updates some of the properties of a managed Cassandra cluster. :param resource_group_name: The name of the resource group. The name is case insensitive. :type resource_group_name: str :param cluster_name: Managed Cassandra cluster name. :type cluster_name: str :param body: Parameters to provide for specifying the managed Cassandra cluster. :type body: ~azure.mgmt.cosmosdb.models.ClusterResource :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either ClusterResource or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ClusterResource] :raises: ~azure.core.exceptions.HttpResponseError """ api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] cls = kwargs.pop('cls', None) # type: ClsType["_models.ClusterResource"] lro_delay = kwargs.pop( 'polling_interval', self._config.polling_interval ) cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] if cont_token is None: raw_result = self._update_initial( resource_group_name=resource_group_name, cluster_name=cluster_name, body=body, api_version=api_version, content_type=content_type, cls=lambda x,y,z: x, **kwargs ) kwargs.pop('error_map', None) def get_long_running_output(pipeline_response): response = pipeline_response.http_response deserialized = self._deserialize('ClusterResource', pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized if polling is True: polling_method = ARMPolling(lro_delay, **kwargs) elif polling is False: polling_method = NoPolling() else: polling_method = polling if cont_token: return LROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output ) return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_update.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}"} # type: ignore def _invoke_command_initial( self, resource_group_name: str, cluster_name: str, body: "_models.CommandPostBody", **kwargs: Any ) -> "_models.CommandOutput": cls = kwargs.pop('cls', None) # type: ClsType["_models.CommandOutput"] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] _json = self._serialize.body(body, 'CommandPostBody') request = build_invoke_command_request_initial( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, content_type=content_type, json=_json, template_url=self._invoke_command_initial.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = self._deserialize('CommandOutput', pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized _invoke_command_initial.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/invokeCommand"} # type: ignore
[docs] @distributed_trace def begin_invoke_command( self, resource_group_name: str, cluster_name: str, body: "_models.CommandPostBody", **kwargs: Any ) -> LROPoller["_models.CommandOutput"]: """Invoke a command like nodetool for cassandra maintenance. :param resource_group_name: The name of the resource group. The name is case insensitive. :type resource_group_name: str :param cluster_name: Managed Cassandra cluster name. :type cluster_name: str :param body: Specification which command to run where. :type body: ~azure.mgmt.cosmosdb.models.CommandPostBody :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either CommandOutput or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.CommandOutput] :raises: ~azure.core.exceptions.HttpResponseError """ api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str content_type = kwargs.pop('content_type', "application/json") # type: Optional[str] polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] cls = kwargs.pop('cls', None) # type: ClsType["_models.CommandOutput"] lro_delay = kwargs.pop( 'polling_interval', self._config.polling_interval ) cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] if cont_token is None: raw_result = self._invoke_command_initial( resource_group_name=resource_group_name, cluster_name=cluster_name, body=body, api_version=api_version, content_type=content_type, cls=lambda x,y,z: x, **kwargs ) kwargs.pop('error_map', None) def get_long_running_output(pipeline_response): response = pipeline_response.http_response deserialized = self._deserialize('CommandOutput', pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized if polling is True: polling_method = ARMPolling(lro_delay, **kwargs) elif polling is False: polling_method = NoPolling() else: polling_method = polling if cont_token: return LROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output ) return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_invoke_command.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/invokeCommand"} # type: ignore
[docs] @distributed_trace def list_backups( self, resource_group_name: str, cluster_name: str, **kwargs: Any ) -> Iterable["_models.ListBackups"]: """List the backups of this cluster that are available to restore. :param resource_group_name: The name of the resource group. The name is case insensitive. :type resource_group_name: str :param cluster_name: Managed Cassandra cluster name. :type cluster_name: str :keyword callable cls: A custom type or function that will be passed the direct response :return: An iterator like instance of either ListBackups or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.ListBackups] :raises: ~azure.core.exceptions.HttpResponseError """ api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str cls = kwargs.pop('cls', None) # type: ClsType["_models.ListBackups"] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) def prepare_request(next_link=None): if not next_link: request = build_list_backups_request( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, template_url=self.list_backups.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) else: request = build_list_backups_request( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, template_url=next_link, ) request = _convert_request(request) request.url = self._client.format_url(request.url) request.method = "GET" return request def extract_data(pipeline_response): deserialized = self._deserialize("ListBackups", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) return None, iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) return pipeline_response return ItemPaged( get_next, extract_data )
list_backups.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/backups"} # type: ignore
[docs] @distributed_trace def get_backup( self, resource_group_name: str, cluster_name: str, backup_id: str, **kwargs: Any ) -> "_models.BackupResource": """Get the properties of an individual backup of this cluster that is available to restore. :param resource_group_name: The name of the resource group. The name is case insensitive. :type resource_group_name: str :param cluster_name: Managed Cassandra cluster name. :type cluster_name: str :param backup_id: Id of a restorable backup of a Cassandra cluster. :type backup_id: str :keyword callable cls: A custom type or function that will be passed the direct response :return: BackupResource, or the result of cls(response) :rtype: ~azure.mgmt.cosmosdb.models.BackupResource :raises: ~azure.core.exceptions.HttpResponseError """ cls = kwargs.pop('cls', None) # type: ClsType["_models.BackupResource"] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str request = build_get_backup_request( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, cluster_name=cluster_name, backup_id=backup_id, api_version=api_version, template_url=self.get_backup.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = self._deserialize('BackupResource', pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get_backup.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/backups/{backupId}"} # type: ignore def _deallocate_initial( # pylint: disable=inconsistent-return-statements self, resource_group_name: str, cluster_name: str, **kwargs: Any ) -> None: cls = kwargs.pop('cls', None) # type: ClsType[None] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str request = build_deallocate_request_initial( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, template_url=self._deallocate_initial.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) if cls: return cls(pipeline_response, None, {}) _deallocate_initial.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/deallocate"} # type: ignore
[docs] @distributed_trace def begin_deallocate( # pylint: disable=inconsistent-return-statements self, resource_group_name: str, cluster_name: str, **kwargs: Any ) -> LROPoller[None]: """Deallocate the Managed Cassandra Cluster and Associated Data Centers. Deallocation will deallocate the host virtual machine of this cluster, and reserved the data disk. This won't do anything on an already deallocated cluster. Use Start to restart the cluster. :param resource_group_name: The name of the resource group. The name is case insensitive. :type resource_group_name: str :param cluster_name: Managed Cassandra cluster name. :type cluster_name: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either None or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[None] :raises: ~azure.core.exceptions.HttpResponseError """ api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] cls = kwargs.pop('cls', None) # type: ClsType[None] lro_delay = kwargs.pop( 'polling_interval', self._config.polling_interval ) cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] if cont_token is None: raw_result = self._deallocate_initial( resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, cls=lambda x,y,z: x, **kwargs ) kwargs.pop('error_map', None) def get_long_running_output(pipeline_response): if cls: return cls(pipeline_response, None, {}) if polling is True: polling_method = ARMPolling(lro_delay, **kwargs) elif polling is False: polling_method = NoPolling() else: polling_method = polling if cont_token: return LROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output ) return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_deallocate.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/deallocate"} # type: ignore def _start_initial( # pylint: disable=inconsistent-return-statements self, resource_group_name: str, cluster_name: str, **kwargs: Any ) -> None: cls = kwargs.pop('cls', None) # type: ClsType[None] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str request = build_start_request_initial( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, template_url=self._start_initial.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) if cls: return cls(pipeline_response, None, {}) _start_initial.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/start"} # type: ignore
[docs] @distributed_trace def begin_start( # pylint: disable=inconsistent-return-statements self, resource_group_name: str, cluster_name: str, **kwargs: Any ) -> LROPoller[None]: """Start the Managed Cassandra Cluster and Associated Data Centers. Start will start the host virtual machine of this cluster with reserved data disk. This won't do anything on an already running cluster. Use Deallocate to deallocate the cluster. :param resource_group_name: The name of the resource group. The name is case insensitive. :type resource_group_name: str :param cluster_name: Managed Cassandra cluster name. :type cluster_name: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either None or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[None] :raises: ~azure.core.exceptions.HttpResponseError """ api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] cls = kwargs.pop('cls', None) # type: ClsType[None] lro_delay = kwargs.pop( 'polling_interval', self._config.polling_interval ) cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] if cont_token is None: raw_result = self._start_initial( resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, cls=lambda x,y,z: x, **kwargs ) kwargs.pop('error_map', None) def get_long_running_output(pipeline_response): if cls: return cls(pipeline_response, None, {}) if polling is True: polling_method = ARMPolling(lro_delay, **kwargs) elif polling is False: polling_method = NoPolling() else: polling_method = polling if cont_token: return LROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output ) return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
begin_start.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/start"} # type: ignore
[docs] @distributed_trace def status( self, resource_group_name: str, cluster_name: str, **kwargs: Any ) -> "_models.CassandraClusterPublicStatus": """Gets the CPU, memory, and disk usage statistics for each Cassandra node in a cluster. :param resource_group_name: The name of the resource group. The name is case insensitive. :type resource_group_name: str :param cluster_name: Managed Cassandra cluster name. :type cluster_name: str :keyword callable cls: A custom type or function that will be passed the direct response :return: CassandraClusterPublicStatus, or the result of cls(response) :rtype: ~azure.mgmt.cosmosdb.models.CassandraClusterPublicStatus :raises: ~azure.core.exceptions.HttpResponseError """ cls = kwargs.pop('cls', None) # type: ClsType["_models.CassandraClusterPublicStatus"] error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError } error_map.update(kwargs.pop('error_map', {})) api_version = kwargs.pop('api_version', "2022-02-15-preview") # type: str request = build_status_request( subscription_id=self._config.subscription_id, resource_group_name=resource_group_name, cluster_name=cluster_name, api_version=api_version, template_url=self.status.metadata['url'], ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = self._deserialize('CassandraClusterPublicStatus', pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
status.metadata = {'url': "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/cassandraClusters/{clusterName}/status"} # type: ignore