Source code for azure.mgmt.datamigration.operations._services_operations

# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import sys
from typing import Any, Callable, Dict, IO, Iterable, Optional, TypeVar, Union, cast, overload
import urllib.parse

from azure.core.exceptions import (
    ClientAuthenticationError,
    HttpResponseError,
    ResourceExistsError,
    ResourceNotFoundError,
    ResourceNotModifiedError,
    map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import HttpResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling

from .. import models as _models
from .._serialization import Serializer
from .._vendor import _convert_request, _format_url_section

if sys.version_info >= (3, 8):
    from typing import Literal  # pylint: disable=no-name-in-module, ungrouped-imports
else:
    from typing_extensions import Literal  # type: ignore  # pylint: disable=ungrouped-imports
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]

_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False


def build_create_or_update_request(
    group_name: str, service_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "groupName": _SERIALIZER.url("group_name", group_name, "str"),
        "serviceName": _SERIALIZER.url("service_name", service_name, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_get_request(group_name: str, service_name: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "groupName": _SERIALIZER.url("group_name", group_name, "str"),
        "serviceName": _SERIALIZER.url("service_name", service_name, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_delete_request(
    group_name: str,
    service_name: str,
    subscription_id: str,
    *,
    delete_running_tasks: Optional[bool] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "groupName": _SERIALIZER.url("group_name", group_name, "str"),
        "serviceName": _SERIALIZER.url("service_name", service_name, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
    if delete_running_tasks is not None:
        _params["deleteRunningTasks"] = _SERIALIZER.query("delete_running_tasks", delete_running_tasks, "bool")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)


def build_update_request(group_name: str, service_name: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "groupName": _SERIALIZER.url("group_name", group_name, "str"),
        "serviceName": _SERIALIZER.url("service_name", service_name, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PATCH", url=_url, params=_params, headers=_headers, **kwargs)


def build_check_status_request(group_name: str, service_name: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/checkStatus",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "groupName": _SERIALIZER.url("group_name", group_name, "str"),
        "serviceName": _SERIALIZER.url("service_name", service_name, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_start_request(group_name: str, service_name: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/start",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "groupName": _SERIALIZER.url("group_name", group_name, "str"),
        "serviceName": _SERIALIZER.url("service_name", service_name, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_stop_request(group_name: str, service_name: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/stop",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "groupName": _SERIALIZER.url("group_name", group_name, "str"),
        "serviceName": _SERIALIZER.url("service_name", service_name, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_skus_request(group_name: str, service_name: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/skus",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "groupName": _SERIALIZER.url("group_name", group_name, "str"),
        "serviceName": _SERIALIZER.url("service_name", service_name, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_check_children_name_availability_request(
    group_name: str, service_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/checkNameAvailability",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "groupName": _SERIALIZER.url("group_name", group_name, "str"),
        "serviceName": _SERIALIZER.url("service_name", service_name, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_by_resource_group_request(group_name: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "groupName": _SERIALIZER.url("group_name", group_name, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_request(subscription_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/providers/Microsoft.DataMigration/services")
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_check_name_availability_request(location: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: Literal["2022-03-30-preview"] = kwargs.pop(
        "api_version", _params.pop("api-version", "2022-03-30-preview")
    )
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/providers/Microsoft.DataMigration/locations/{location}/checkNameAvailability",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "location": _SERIALIZER.url("location", location, "str"),
    }

    _url: str = _format_url_section(_url, **path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


[docs]class ServicesOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.mgmt.datamigration.DataMigrationManagementClient`'s :attr:`services` attribute. """ models = _models def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer") def _create_or_update_initial( self, group_name: str, service_name: str, parameters: Union[_models.DataMigrationService, IO], **kwargs: Any ) -> Optional[_models.DataMigrationService]: error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[Optional[_models.DataMigrationService]] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(parameters, (IO, bytes)): _content = parameters else: _json = self._serialize.body(parameters, "DataMigrationService") request = build_create_or_update_request( group_name=group_name, service_name=service_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, template_url=self._create_or_update_initial.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 201, 202]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = None if response.status_code == 200: deserialized = self._deserialize("DataMigrationService", pipeline_response) if response.status_code == 201: deserialized = self._deserialize("DataMigrationService", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized _create_or_update_initial.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}" } @overload def begin_create_or_update( self, group_name: str, service_name: str, parameters: _models.DataMigrationService, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.DataMigrationService]: """Create or update DMS Instance. The services resource is the top-level resource that represents the Database Migration Service. The PUT method creates a new service or updates an existing one. When a service is updated, existing child resources (i.e. tasks) are unaffected. Services currently support a single kind, "vm", which refers to a VM-based service, although other kinds may be added in the future. This method can change the kind, SKU, and network of the service, but if tasks are currently running (i.e. the service is busy), this will fail with 400 Bad Request ("ServiceIsBusy"). The provider will reply when successful with 200 OK or 201 Created. Long-running operations use the provisioningState property. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :param parameters: Information about the service. Required. :type parameters: ~azure.mgmt.datamigration.models.DataMigrationService :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either DataMigrationService or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.datamigration.models.DataMigrationService] :raises ~azure.core.exceptions.HttpResponseError: """ @overload def begin_create_or_update( self, group_name: str, service_name: str, parameters: IO, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.DataMigrationService]: """Create or update DMS Instance. The services resource is the top-level resource that represents the Database Migration Service. The PUT method creates a new service or updates an existing one. When a service is updated, existing child resources (i.e. tasks) are unaffected. Services currently support a single kind, "vm", which refers to a VM-based service, although other kinds may be added in the future. This method can change the kind, SKU, and network of the service, but if tasks are currently running (i.e. the service is busy), this will fail with 400 Bad Request ("ServiceIsBusy"). The provider will reply when successful with 200 OK or 201 Created. Long-running operations use the provisioningState property. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :param parameters: Information about the service. Required. :type parameters: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either DataMigrationService or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.datamigration.models.DataMigrationService] :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def begin_create_or_update( self, group_name: str, service_name: str, parameters: Union[_models.DataMigrationService, IO], **kwargs: Any ) -> LROPoller[_models.DataMigrationService]: """Create or update DMS Instance. The services resource is the top-level resource that represents the Database Migration Service. The PUT method creates a new service or updates an existing one. When a service is updated, existing child resources (i.e. tasks) are unaffected. Services currently support a single kind, "vm", which refers to a VM-based service, although other kinds may be added in the future. This method can change the kind, SKU, and network of the service, but if tasks are currently running (i.e. the service is busy), this will fail with 400 Bad Request ("ServiceIsBusy"). The provider will reply when successful with 200 OK or 201 Created. Long-running operations use the provisioningState property. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :param parameters: Information about the service. Is either a model type or a IO type. Required. :type parameters: ~azure.mgmt.datamigration.models.DataMigrationService or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either DataMigrationService or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.datamigration.models.DataMigrationService] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.DataMigrationService] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._create_or_update_initial( group_name=group_name, service_name=service_name, parameters=parameters, api_version=api_version, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("DataMigrationService", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
begin_create_or_update.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}" }
[docs] @distributed_trace def get(self, group_name: str, service_name: str, **kwargs: Any) -> _models.DataMigrationService: """Get DMS Service Instance. The services resource is the top-level resource that represents the Database Migration Service. The GET method retrieves information about a service instance. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :keyword callable cls: A custom type or function that will be passed the direct response :return: DataMigrationService or the result of cls(response) :rtype: ~azure.mgmt.datamigration.models.DataMigrationService :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) cls: ClsType[_models.DataMigrationService] = kwargs.pop("cls", None) request = build_get_request( group_name=group_name, service_name=service_name, subscription_id=self._config.subscription_id, api_version=api_version, template_url=self.get.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("DataMigrationService", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}" } def _delete_initial( # pylint: disable=inconsistent-return-statements self, group_name: str, service_name: str, delete_running_tasks: Optional[bool] = None, **kwargs: Any ) -> None: error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) cls: ClsType[None] = kwargs.pop("cls", None) request = build_delete_request( group_name=group_name, service_name=service_name, subscription_id=self._config.subscription_id, delete_running_tasks=delete_running_tasks, api_version=api_version, template_url=self._delete_initial.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) if cls: return cls(pipeline_response, None, {}) _delete_initial.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}" }
[docs] @distributed_trace def begin_delete( self, group_name: str, service_name: str, delete_running_tasks: Optional[bool] = None, **kwargs: Any ) -> LROPoller[None]: """Delete DMS Service Instance. The services resource is the top-level resource that represents the Database Migration Service. The DELETE method deletes a service. Any running tasks will be canceled. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :param delete_running_tasks: Delete the resource even if it contains running tasks. Default value is None. :type delete_running_tasks: bool :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either None or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[None] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) cls: ClsType[None] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._delete_initial( # type: ignore group_name=group_name, service_name=service_name, delete_running_tasks=delete_running_tasks, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements if cls: return cls(pipeline_response, None, {}) if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
begin_delete.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}" } def _update_initial( self, group_name: str, service_name: str, parameters: Union[_models.DataMigrationService, IO], **kwargs: Any ) -> Optional[_models.DataMigrationService]: error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[Optional[_models.DataMigrationService]] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(parameters, (IO, bytes)): _content = parameters else: _json = self._serialize.body(parameters, "DataMigrationService") request = build_update_request( group_name=group_name, service_name=service_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, template_url=self._update_initial.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = None if response.status_code == 200: deserialized = self._deserialize("DataMigrationService", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized _update_initial.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}" } @overload def begin_update( self, group_name: str, service_name: str, parameters: _models.DataMigrationService, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.DataMigrationService]: """Create or update DMS Service Instance. The services resource is the top-level resource that represents the Database Migration Service. The PATCH method updates an existing service. This method can change the kind, SKU, and network of the service, but if tasks are currently running (i.e. the service is busy), this will fail with 400 Bad Request ("ServiceIsBusy"). :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :param parameters: Information about the service. Required. :type parameters: ~azure.mgmt.datamigration.models.DataMigrationService :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either DataMigrationService or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.datamigration.models.DataMigrationService] :raises ~azure.core.exceptions.HttpResponseError: """ @overload def begin_update( self, group_name: str, service_name: str, parameters: IO, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.DataMigrationService]: """Create or update DMS Service Instance. The services resource is the top-level resource that represents the Database Migration Service. The PATCH method updates an existing service. This method can change the kind, SKU, and network of the service, but if tasks are currently running (i.e. the service is busy), this will fail with 400 Bad Request ("ServiceIsBusy"). :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :param parameters: Information about the service. Required. :type parameters: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either DataMigrationService or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.datamigration.models.DataMigrationService] :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def begin_update( self, group_name: str, service_name: str, parameters: Union[_models.DataMigrationService, IO], **kwargs: Any ) -> LROPoller[_models.DataMigrationService]: """Create or update DMS Service Instance. The services resource is the top-level resource that represents the Database Migration Service. The PATCH method updates an existing service. This method can change the kind, SKU, and network of the service, but if tasks are currently running (i.e. the service is busy), this will fail with 400 Bad Request ("ServiceIsBusy"). :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :param parameters: Information about the service. Is either a model type or a IO type. Required. :type parameters: ~azure.mgmt.datamigration.models.DataMigrationService or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either DataMigrationService or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.datamigration.models.DataMigrationService] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.DataMigrationService] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._update_initial( group_name=group_name, service_name=service_name, parameters=parameters, api_version=api_version, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("DataMigrationService", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
begin_update.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}" }
[docs] @distributed_trace def check_status( self, group_name: str, service_name: str, **kwargs: Any ) -> _models.DataMigrationServiceStatusResponse: """Check service health status. The services resource is the top-level resource that represents the Database Migration Service. This action performs a health check and returns the status of the service and virtual machine size. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :keyword callable cls: A custom type or function that will be passed the direct response :return: DataMigrationServiceStatusResponse or the result of cls(response) :rtype: ~azure.mgmt.datamigration.models.DataMigrationServiceStatusResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) cls: ClsType[_models.DataMigrationServiceStatusResponse] = kwargs.pop("cls", None) request = build_check_status_request( group_name=group_name, service_name=service_name, subscription_id=self._config.subscription_id, api_version=api_version, template_url=self.check_status.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("DataMigrationServiceStatusResponse", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
check_status.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/checkStatus" } def _start_initial( # pylint: disable=inconsistent-return-statements self, group_name: str, service_name: str, **kwargs: Any ) -> None: error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) cls: ClsType[None] = kwargs.pop("cls", None) request = build_start_request( group_name=group_name, service_name=service_name, subscription_id=self._config.subscription_id, api_version=api_version, template_url=self._start_initial.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) if cls: return cls(pipeline_response, None, {}) _start_initial.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/start" }
[docs] @distributed_trace def begin_start(self, group_name: str, service_name: str, **kwargs: Any) -> LROPoller[None]: """Start service. The services resource is the top-level resource that represents the Database Migration Service. This action starts the service and the service can be used for data migration. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either None or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[None] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) cls: ClsType[None] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._start_initial( # type: ignore group_name=group_name, service_name=service_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements if cls: return cls(pipeline_response, None, {}) if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
begin_start.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/start" } def _stop_initial( # pylint: disable=inconsistent-return-statements self, group_name: str, service_name: str, **kwargs: Any ) -> None: error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) cls: ClsType[None] = kwargs.pop("cls", None) request = build_stop_request( group_name=group_name, service_name=service_name, subscription_id=self._config.subscription_id, api_version=api_version, template_url=self._stop_initial.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) if cls: return cls(pipeline_response, None, {}) _stop_initial.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/stop" }
[docs] @distributed_trace def begin_stop(self, group_name: str, service_name: str, **kwargs: Any) -> LROPoller[None]: """Stop service. The services resource is the top-level resource that represents the Database Migration Service. This action stops the service and the service cannot be used for data migration. The service owner won't be billed when the service is stopped. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :keyword callable cls: A custom type or function that will be passed the direct response :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be ARMPolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.PollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of LROPoller that returns either None or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[None] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) cls: ClsType[None] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._stop_initial( # type: ignore group_name=group_name, service_name=service_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements if cls: return cls(pipeline_response, None, {}) if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
begin_stop.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/stop" }
[docs] @distributed_trace def list_skus(self, group_name: str, service_name: str, **kwargs: Any) -> Iterable["_models.AvailableServiceSku"]: """Get compatible SKUs. The services resource is the top-level resource that represents the Database Migration Service. The skus action returns the list of SKUs that a service resource can be updated to. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :keyword callable cls: A custom type or function that will be passed the direct response :return: An iterator like instance of either AvailableServiceSku or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.datamigration.models.AvailableServiceSku] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) cls: ClsType[_models.ServiceSkuList] = kwargs.pop("cls", None) error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_list_skus_request( group_name=group_name, service_name=service_name, subscription_id=self._config.subscription_id, api_version=api_version, template_url=self.list_skus.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) request = _convert_request(request) request.url = self._client.format_url(request.url) request.method = "GET" return request def extract_data(pipeline_response): deserialized = self._deserialize("ServiceSkuList", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.next_link or None, iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) return pipeline_response return ItemPaged(get_next, extract_data)
list_skus.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/skus" } @overload def check_children_name_availability( self, group_name: str, service_name: str, parameters: _models.NameAvailabilityRequest, *, content_type: str = "application/json", **kwargs: Any ) -> _models.NameAvailabilityResponse: """Check nested resource name validity and availability. This method checks whether a proposed nested resource name is valid and available. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :param parameters: Requested name to validate. Required. :type parameters: ~azure.mgmt.datamigration.models.NameAvailabilityRequest :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: NameAvailabilityResponse or the result of cls(response) :rtype: ~azure.mgmt.datamigration.models.NameAvailabilityResponse :raises ~azure.core.exceptions.HttpResponseError: """ @overload def check_children_name_availability( self, group_name: str, service_name: str, parameters: IO, *, content_type: str = "application/json", **kwargs: Any ) -> _models.NameAvailabilityResponse: """Check nested resource name validity and availability. This method checks whether a proposed nested resource name is valid and available. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :param parameters: Requested name to validate. Required. :type parameters: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: NameAvailabilityResponse or the result of cls(response) :rtype: ~azure.mgmt.datamigration.models.NameAvailabilityResponse :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def check_children_name_availability( self, group_name: str, service_name: str, parameters: Union[_models.NameAvailabilityRequest, IO], **kwargs: Any ) -> _models.NameAvailabilityResponse: """Check nested resource name validity and availability. This method checks whether a proposed nested resource name is valid and available. :param group_name: Name of the resource group. Required. :type group_name: str :param service_name: Name of the service. Required. :type service_name: str :param parameters: Requested name to validate. Is either a model type or a IO type. Required. :type parameters: ~azure.mgmt.datamigration.models.NameAvailabilityRequest or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: NameAvailabilityResponse or the result of cls(response) :rtype: ~azure.mgmt.datamigration.models.NameAvailabilityResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.NameAvailabilityResponse] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(parameters, (IO, bytes)): _content = parameters else: _json = self._serialize.body(parameters, "NameAvailabilityRequest") request = build_check_children_name_availability_request( group_name=group_name, service_name=service_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, template_url=self.check_children_name_availability.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("NameAvailabilityResponse", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
check_children_name_availability.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services/{serviceName}/checkNameAvailability" }
[docs] @distributed_trace def list_by_resource_group(self, group_name: str, **kwargs: Any) -> Iterable["_models.DataMigrationService"]: """Get services in resource group. The Services resource is the top-level resource that represents the Database Migration Service. This method returns a list of service resources in a resource group. :param group_name: Name of the resource group. Required. :type group_name: str :keyword callable cls: A custom type or function that will be passed the direct response :return: An iterator like instance of either DataMigrationService or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.datamigration.models.DataMigrationService] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) cls: ClsType[_models.DataMigrationServiceList] = kwargs.pop("cls", None) error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_list_by_resource_group_request( group_name=group_name, subscription_id=self._config.subscription_id, api_version=api_version, template_url=self.list_by_resource_group.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) request = _convert_request(request) request.url = self._client.format_url(request.url) request.method = "GET" return request def extract_data(pipeline_response): deserialized = self._deserialize("DataMigrationServiceList", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.next_link or None, iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) return pipeline_response return ItemPaged(get_next, extract_data)
list_by_resource_group.metadata = { "url": "/subscriptions/{subscriptionId}/resourceGroups/{groupName}/providers/Microsoft.DataMigration/services" }
[docs] @distributed_trace def list(self, **kwargs: Any) -> Iterable["_models.DataMigrationService"]: """Get services in subscription. The services resource is the top-level resource that represents the Database Migration Service. This method returns a list of service resources in a subscription. :keyword callable cls: A custom type or function that will be passed the direct response :return: An iterator like instance of either DataMigrationService or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.datamigration.models.DataMigrationService] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) cls: ClsType[_models.DataMigrationServiceList] = kwargs.pop("cls", None) error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_list_request( subscription_id=self._config.subscription_id, api_version=api_version, template_url=self.list.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) request = _convert_request(request) request.url = self._client.format_url(request.url) request.method = "GET" return request def extract_data(pipeline_response): deserialized = self._deserialize("DataMigrationServiceList", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.next_link or None, iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) return pipeline_response return ItemPaged(get_next, extract_data)
list.metadata = {"url": "/subscriptions/{subscriptionId}/providers/Microsoft.DataMigration/services"} @overload def check_name_availability( self, location: str, parameters: _models.NameAvailabilityRequest, *, content_type: str = "application/json", **kwargs: Any ) -> _models.NameAvailabilityResponse: """Check name validity and availability. This method checks whether a proposed top-level resource name is valid and available. :param location: The Azure region of the operation. Required. :type location: str :param parameters: Requested name to validate. Required. :type parameters: ~azure.mgmt.datamigration.models.NameAvailabilityRequest :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: NameAvailabilityResponse or the result of cls(response) :rtype: ~azure.mgmt.datamigration.models.NameAvailabilityResponse :raises ~azure.core.exceptions.HttpResponseError: """ @overload def check_name_availability( self, location: str, parameters: IO, *, content_type: str = "application/json", **kwargs: Any ) -> _models.NameAvailabilityResponse: """Check name validity and availability. This method checks whether a proposed top-level resource name is valid and available. :param location: The Azure region of the operation. Required. :type location: str :param parameters: Requested name to validate. Required. :type parameters: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: NameAvailabilityResponse or the result of cls(response) :rtype: ~azure.mgmt.datamigration.models.NameAvailabilityResponse :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def check_name_availability( self, location: str, parameters: Union[_models.NameAvailabilityRequest, IO], **kwargs: Any ) -> _models.NameAvailabilityResponse: """Check name validity and availability. This method checks whether a proposed top-level resource name is valid and available. :param location: The Azure region of the operation. Required. :type location: str :param parameters: Requested name to validate. Is either a model type or a IO type. Required. :type parameters: ~azure.mgmt.datamigration.models.NameAvailabilityRequest or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: NameAvailabilityResponse or the result of cls(response) :rtype: ~azure.mgmt.datamigration.models.NameAvailabilityResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: Literal["2022-03-30-preview"] = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.NameAvailabilityResponse] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(parameters, (IO, bytes)): _content = parameters else: _json = self._serialize.body(parameters, "NameAvailabilityRequest") request = build_check_name_availability_request( location=location, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, template_url=self.check_name_availability.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ApiError, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("NameAvailabilityResponse", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
check_name_availability.metadata = { "url": "/subscriptions/{subscriptionId}/providers/Microsoft.DataMigration/locations/{location}/checkNameAvailability" }