# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import datetime
import sys
from typing import Any, AsyncIterable, Callable, Dict, IO, Optional, TypeVar, Union, cast, overload
import urllib.parse
from azure.core.async_paging import AsyncItemPaged, AsyncList
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
ResourceNotModifiedError,
map_error,
)
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import AsyncHttpResponse
from azure.core.polling import AsyncLROPoller, AsyncNoPolling, AsyncPollingMethod
from azure.core.polling.async_base_polling import AsyncLROBasePolling
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.utils import case_insensitive_dict
from ...operations._operations import (
build_dev_boxes_create_dev_box_request,
build_dev_boxes_delay_upcoming_action_request,
build_dev_boxes_delete_dev_box_request,
build_dev_boxes_get_dev_box_by_user_request,
build_dev_boxes_get_pool_request,
build_dev_boxes_get_remote_connection_request,
build_dev_boxes_get_schedule_by_pool_request,
build_dev_boxes_get_upcoming_action_request,
build_dev_boxes_list_dev_boxes_by_user_request,
build_dev_boxes_list_pools_request,
build_dev_boxes_list_schedules_by_pool_request,
build_dev_boxes_list_upcoming_actions_request,
build_dev_boxes_skip_upcoming_action_request,
build_dev_boxes_start_dev_box_request,
build_dev_boxes_stop_dev_box_request,
build_dev_center_get_project_request,
build_dev_center_list_all_dev_boxes_by_user_request,
build_dev_center_list_all_dev_boxes_request,
build_dev_center_list_projects_request,
build_environments_create_or_update_environment_request,
build_environments_custom_environment_action_request,
build_environments_delete_environment_request,
build_environments_deploy_environment_action_request,
build_environments_get_catalog_item_request,
build_environments_get_catalog_item_version_request,
build_environments_get_environment_by_user_request,
build_environments_list_catalog_item_versions_request,
build_environments_list_catalog_items_request,
build_environments_list_environment_types_request,
build_environments_list_environments_by_user_request,
build_environments_list_environments_request,
build_environments_update_environment_request,
)
if sys.version_info >= (3, 9):
from collections.abc import MutableMapping
else:
from typing import MutableMapping # type: ignore # pylint: disable=ungrouped-imports
JSON = MutableMapping[str, Any] # pylint: disable=unsubscriptable-object
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]
[docs]class DevCenterOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.developer.devcenter.aio.DevCenterClient`'s
:attr:`dev_center` attribute.
"""
def __init__(self, *args, **kwargs) -> None:
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace
def list_projects(
self, *, filter: Optional[str] = None, top: Optional[int] = None, **kwargs: Any
) -> AsyncIterable[JSON]:
"""Lists all projects.
:keyword filter: An OData filter clause to apply to the operation. Default value is None.
:paramtype filter: str
:keyword top: The maximum number of resources to return from the operation. Example: 'top=10'.
Default value is None.
:paramtype top: int
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"description": "str", # Optional. Description of the project.
"name": "str" # Optional. Name of the project.
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_dev_center_list_projects_request(
filter=filter,
top=top,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async
async def get_project(self, project_name: str, **kwargs: Any) -> JSON:
"""Gets a project.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"description": "str", # Optional. Description of the project.
"name": "str" # Optional. Name of the project.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_dev_center_get_project_request(
project_name=project_name,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
[docs] @distributed_trace
def list_all_dev_boxes(
self, *, filter: Optional[str] = None, top: Optional[int] = None, **kwargs: Any
) -> AsyncIterable[JSON]:
"""Lists Dev Boxes that the caller has access to in the DevCenter.
:keyword filter: An OData filter clause to apply to the operation. Default value is None.
:paramtype filter: str
:keyword top: The maximum number of resources to return from the operation. Example: 'top=10'.
Default value is None.
:paramtype top: int
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"poolName": "str", # The name of the Dev Box pool this machine belongs to.
Required.
"actionState": "str", # Optional. The current action state of the Dev Box.
This is state is based on previous action performed by user.
"createdTime": "2020-02-20 00:00:00", # Optional. Creation time of this Dev
Box.
"errorDetails": {
"code": "str", # Optional. The error code.
"message": "str" # Optional. The error message.
},
"hardwareProfile": {
"memoryGB": 0, # Optional. The amount of memory available for the
Dev Box.
"skuName": "str", # Optional. The name of the SKU.
"vCPUs": 0 # Optional. The number of vCPUs available for the Dev
Box.
},
"hibernateSupport": "str", # Optional. Indicates whether hibernate is
enabled/disabled or unknown. Known values are: "Disabled" and "Enabled".
"imageReference": {
"name": "str", # Optional. The name of the image used.
"operatingSystem": "str", # Optional. The operating system of the
image.
"osBuildNumber": "str", # Optional. The operating system build
number of the image.
"publishedDate": "2020-02-20 00:00:00", # Optional. The datetime
that the backing image version was published.
"version": "str" # Optional. The version of the image.
},
"localAdministrator": "str", # Optional. Indicates whether the owner of the
Dev Box is a local administrator. Known values are: "Enabled" and "Disabled".
"location": "str", # Optional. Azure region where this Dev Box is located.
This will be the same region as the Virtual Network it is attached to.
"name": "str", # Optional. Display name for the Dev Box.
"osType": "str", # Optional. The operating system type of this Dev Box.
"Windows"
"powerState": "str", # Optional. The current power state of the Dev Box.
Known values are: "Unknown", "Deallocated", "PoweredOff", "Running", and
"Hibernated".
"projectName": "str", # Optional. Name of the project this Dev Box belongs
to.
"provisioningState": "str", # Optional. The current provisioning state of
the Dev Box.
"storageProfile": {
"osDisk": {
"diskSizeGB": 0 # Optional. The size of the OS Disk in
gigabytes.
}
},
"uniqueId": "str", # Optional. A unique identifier for the Dev Box. This is
a GUID-formatted string (e.g. 00000000-0000-0000-0000-000000000000).
"user": "str" # Optional. The AAD object id of the user this Dev Box is
assigned to.
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_dev_center_list_all_dev_boxes_request(
filter=filter,
top=top,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace
def list_all_dev_boxes_by_user(
self, user_id: str = "me", *, filter: Optional[str] = None, top: Optional[int] = None, **kwargs: Any
) -> AsyncIterable[JSON]:
"""Lists Dev Boxes in the Dev Center for a particular user.
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword filter: An OData filter clause to apply to the operation. Default value is None.
:paramtype filter: str
:keyword top: The maximum number of resources to return from the operation. Example: 'top=10'.
Default value is None.
:paramtype top: int
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"poolName": "str", # The name of the Dev Box pool this machine belongs to.
Required.
"actionState": "str", # Optional. The current action state of the Dev Box.
This is state is based on previous action performed by user.
"createdTime": "2020-02-20 00:00:00", # Optional. Creation time of this Dev
Box.
"errorDetails": {
"code": "str", # Optional. The error code.
"message": "str" # Optional. The error message.
},
"hardwareProfile": {
"memoryGB": 0, # Optional. The amount of memory available for the
Dev Box.
"skuName": "str", # Optional. The name of the SKU.
"vCPUs": 0 # Optional. The number of vCPUs available for the Dev
Box.
},
"hibernateSupport": "str", # Optional. Indicates whether hibernate is
enabled/disabled or unknown. Known values are: "Disabled" and "Enabled".
"imageReference": {
"name": "str", # Optional. The name of the image used.
"operatingSystem": "str", # Optional. The operating system of the
image.
"osBuildNumber": "str", # Optional. The operating system build
number of the image.
"publishedDate": "2020-02-20 00:00:00", # Optional. The datetime
that the backing image version was published.
"version": "str" # Optional. The version of the image.
},
"localAdministrator": "str", # Optional. Indicates whether the owner of the
Dev Box is a local administrator. Known values are: "Enabled" and "Disabled".
"location": "str", # Optional. Azure region where this Dev Box is located.
This will be the same region as the Virtual Network it is attached to.
"name": "str", # Optional. Display name for the Dev Box.
"osType": "str", # Optional. The operating system type of this Dev Box.
"Windows"
"powerState": "str", # Optional. The current power state of the Dev Box.
Known values are: "Unknown", "Deallocated", "PoweredOff", "Running", and
"Hibernated".
"projectName": "str", # Optional. Name of the project this Dev Box belongs
to.
"provisioningState": "str", # Optional. The current provisioning state of
the Dev Box.
"storageProfile": {
"osDisk": {
"diskSizeGB": 0 # Optional. The size of the OS Disk in
gigabytes.
}
},
"uniqueId": "str", # Optional. A unique identifier for the Dev Box. This is
a GUID-formatted string (e.g. 00000000-0000-0000-0000-000000000000).
"user": "str" # Optional. The AAD object id of the user this Dev Box is
assigned to.
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_dev_center_list_all_dev_boxes_by_user_request(
user_id=user_id,
filter=filter,
top=top,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)
[docs]class DevBoxesOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.developer.devcenter.aio.DevCenterClient`'s
:attr:`dev_boxes` attribute.
"""
def __init__(self, *args, **kwargs) -> None:
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace
def list_pools(
self, project_name: str, *, top: Optional[int] = None, filter: Optional[str] = None, **kwargs: Any
) -> AsyncIterable[JSON]:
"""Lists available pools.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:keyword top: The maximum number of resources to return from the operation. Example: 'top=10'.
Default value is None.
:paramtype top: int
:keyword filter: An OData filter clause to apply to the operation. Default value is None.
:paramtype filter: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"hardwareProfile": {
"memoryGB": 0, # Optional. The amount of memory available for the
Dev Box.
"skuName": "str", # Optional. The name of the SKU.
"vCPUs": 0 # Optional. The number of vCPUs available for the Dev
Box.
},
"hibernateSupport": "str", # Optional. Indicates whether hibernate is
enabled/disabled or unknown. Known values are: "Disabled" and "Enabled".
"imageReference": {
"name": "str", # Optional. The name of the image used.
"operatingSystem": "str", # Optional. The operating system of the
image.
"osBuildNumber": "str", # Optional. The operating system build
number of the image.
"publishedDate": "2020-02-20 00:00:00", # Optional. The datetime
that the backing image version was published.
"version": "str" # Optional. The version of the image.
},
"localAdministrator": "str", # Optional. Indicates whether owners of Dev
Boxes in this pool are local administrators on the Dev Boxes. Known values are:
"Enabled" and "Disabled".
"location": "str", # Optional. Azure region where Dev Boxes in the pool are
located.
"name": "str", # Optional. Pool name.
"osType": "str", # Optional. The operating system type of Dev Boxes in this
pool. "Windows"
"storageProfile": {
"osDisk": {
"diskSizeGB": 0 # Optional. The size of the OS Disk in
gigabytes.
}
}
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_dev_boxes_list_pools_request(
project_name=project_name,
top=top,
filter=filter,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async
async def get_pool(self, pool_name: str, project_name: str, **kwargs: Any) -> JSON:
"""Gets a pool.
:param pool_name: The name of a pool of Dev Boxes. Required.
:type pool_name: str
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"hardwareProfile": {
"memoryGB": 0, # Optional. The amount of memory available for the
Dev Box.
"skuName": "str", # Optional. The name of the SKU.
"vCPUs": 0 # Optional. The number of vCPUs available for the Dev
Box.
},
"hibernateSupport": "str", # Optional. Indicates whether hibernate is
enabled/disabled or unknown. Known values are: "Disabled" and "Enabled".
"imageReference": {
"name": "str", # Optional. The name of the image used.
"operatingSystem": "str", # Optional. The operating system of the
image.
"osBuildNumber": "str", # Optional. The operating system build
number of the image.
"publishedDate": "2020-02-20 00:00:00", # Optional. The datetime
that the backing image version was published.
"version": "str" # Optional. The version of the image.
},
"localAdministrator": "str", # Optional. Indicates whether owners of Dev
Boxes in this pool are local administrators on the Dev Boxes. Known values are:
"Enabled" and "Disabled".
"location": "str", # Optional. Azure region where Dev Boxes in the pool are
located.
"name": "str", # Optional. Pool name.
"osType": "str", # Optional. The operating system type of Dev Boxes in this
pool. "Windows"
"storageProfile": {
"osDisk": {
"diskSizeGB": 0 # Optional. The size of the OS Disk in
gigabytes.
}
}
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_dev_boxes_get_pool_request(
pool_name=pool_name,
project_name=project_name,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
[docs] @distributed_trace
def list_schedules_by_pool(
self,
project_name: str,
pool_name: str,
*,
top: Optional[int] = None,
filter: Optional[str] = None,
**kwargs: Any
) -> AsyncIterable[JSON]:
"""Lists available schedules for a pool.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param pool_name: The name of a pool of Dev Boxes. Required.
:type pool_name: str
:keyword top: The maximum number of resources to return from the operation. Example: 'top=10'.
Default value is None.
:paramtype top: int
:keyword filter: An OData filter clause to apply to the operation. Default value is None.
:paramtype filter: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"frequency": "str", # Optional. The frequency of this scheduled task.
"Daily"
"name": "str", # Optional. Display name for the Schedule.
"time": "str", # Optional. The target time to trigger the action. The format
is HH:MM.
"timeZone": "str", # Optional. The IANA timezone id at which the schedule
should execute.
"type": "str" # Optional. Supported type this scheduled task represents.
"StopDevBox"
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_dev_boxes_list_schedules_by_pool_request(
project_name=project_name,
pool_name=pool_name,
top=top,
filter=filter,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async
async def get_schedule_by_pool(self, project_name: str, pool_name: str, schedule_name: str, **kwargs: Any) -> JSON:
"""Gets a schedule.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param pool_name: The name of a pool of Dev Boxes. Required.
:type pool_name: str
:param schedule_name: The name of a schedule. Required.
:type schedule_name: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"frequency": "str", # Optional. The frequency of this scheduled task.
"Daily"
"name": "str", # Optional. Display name for the Schedule.
"time": "str", # Optional. The target time to trigger the action. The format
is HH:MM.
"timeZone": "str", # Optional. The IANA timezone id at which the schedule
should execute.
"type": "str" # Optional. Supported type this scheduled task represents.
"StopDevBox"
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_dev_boxes_get_schedule_by_pool_request(
project_name=project_name,
pool_name=pool_name,
schedule_name=schedule_name,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
[docs] @distributed_trace
def list_dev_boxes_by_user(
self,
project_name: str,
user_id: str = "me",
*,
filter: Optional[str] = None,
top: Optional[int] = None,
**kwargs: Any
) -> AsyncIterable[JSON]:
"""Lists Dev Boxes in the project for a particular user.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword filter: An OData filter clause to apply to the operation. Default value is None.
:paramtype filter: str
:keyword top: The maximum number of resources to return from the operation. Example: 'top=10'.
Default value is None.
:paramtype top: int
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"poolName": "str", # The name of the Dev Box pool this machine belongs to.
Required.
"actionState": "str", # Optional. The current action state of the Dev Box.
This is state is based on previous action performed by user.
"createdTime": "2020-02-20 00:00:00", # Optional. Creation time of this Dev
Box.
"errorDetails": {
"code": "str", # Optional. The error code.
"message": "str" # Optional. The error message.
},
"hardwareProfile": {
"memoryGB": 0, # Optional. The amount of memory available for the
Dev Box.
"skuName": "str", # Optional. The name of the SKU.
"vCPUs": 0 # Optional. The number of vCPUs available for the Dev
Box.
},
"hibernateSupport": "str", # Optional. Indicates whether hibernate is
enabled/disabled or unknown. Known values are: "Disabled" and "Enabled".
"imageReference": {
"name": "str", # Optional. The name of the image used.
"operatingSystem": "str", # Optional. The operating system of the
image.
"osBuildNumber": "str", # Optional. The operating system build
number of the image.
"publishedDate": "2020-02-20 00:00:00", # Optional. The datetime
that the backing image version was published.
"version": "str" # Optional. The version of the image.
},
"localAdministrator": "str", # Optional. Indicates whether the owner of the
Dev Box is a local administrator. Known values are: "Enabled" and "Disabled".
"location": "str", # Optional. Azure region where this Dev Box is located.
This will be the same region as the Virtual Network it is attached to.
"name": "str", # Optional. Display name for the Dev Box.
"osType": "str", # Optional. The operating system type of this Dev Box.
"Windows"
"powerState": "str", # Optional. The current power state of the Dev Box.
Known values are: "Unknown", "Deallocated", "PoweredOff", "Running", and
"Hibernated".
"projectName": "str", # Optional. Name of the project this Dev Box belongs
to.
"provisioningState": "str", # Optional. The current provisioning state of
the Dev Box.
"storageProfile": {
"osDisk": {
"diskSizeGB": 0 # Optional. The size of the OS Disk in
gigabytes.
}
},
"uniqueId": "str", # Optional. A unique identifier for the Dev Box. This is
a GUID-formatted string (e.g. 00000000-0000-0000-0000-000000000000).
"user": "str" # Optional. The AAD object id of the user this Dev Box is
assigned to.
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_dev_boxes_list_dev_boxes_by_user_request(
project_name=project_name,
user_id=user_id,
filter=filter,
top=top,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async
async def get_dev_box_by_user(
self, project_name: str, dev_box_name: str, user_id: str = "me", **kwargs: Any
) -> JSON:
"""Gets a Dev Box.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"poolName": "str", # The name of the Dev Box pool this machine belongs to.
Required.
"actionState": "str", # Optional. The current action state of the Dev Box.
This is state is based on previous action performed by user.
"createdTime": "2020-02-20 00:00:00", # Optional. Creation time of this Dev
Box.
"errorDetails": {
"code": "str", # Optional. The error code.
"message": "str" # Optional. The error message.
},
"hardwareProfile": {
"memoryGB": 0, # Optional. The amount of memory available for the
Dev Box.
"skuName": "str", # Optional. The name of the SKU.
"vCPUs": 0 # Optional. The number of vCPUs available for the Dev
Box.
},
"hibernateSupport": "str", # Optional. Indicates whether hibernate is
enabled/disabled or unknown. Known values are: "Disabled" and "Enabled".
"imageReference": {
"name": "str", # Optional. The name of the image used.
"operatingSystem": "str", # Optional. The operating system of the
image.
"osBuildNumber": "str", # Optional. The operating system build
number of the image.
"publishedDate": "2020-02-20 00:00:00", # Optional. The datetime
that the backing image version was published.
"version": "str" # Optional. The version of the image.
},
"localAdministrator": "str", # Optional. Indicates whether the owner of the
Dev Box is a local administrator. Known values are: "Enabled" and "Disabled".
"location": "str", # Optional. Azure region where this Dev Box is located.
This will be the same region as the Virtual Network it is attached to.
"name": "str", # Optional. Display name for the Dev Box.
"osType": "str", # Optional. The operating system type of this Dev Box.
"Windows"
"powerState": "str", # Optional. The current power state of the Dev Box.
Known values are: "Unknown", "Deallocated", "PoweredOff", "Running", and
"Hibernated".
"projectName": "str", # Optional. Name of the project this Dev Box belongs
to.
"provisioningState": "str", # Optional. The current provisioning state of
the Dev Box.
"storageProfile": {
"osDisk": {
"diskSizeGB": 0 # Optional. The size of the OS Disk in
gigabytes.
}
},
"uniqueId": "str", # Optional. A unique identifier for the Dev Box. This is
a GUID-formatted string (e.g. 00000000-0000-0000-0000-000000000000).
"user": "str" # Optional. The AAD object id of the user this Dev Box is
assigned to.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_dev_boxes_get_dev_box_by_user_request(
project_name=project_name,
dev_box_name=dev_box_name,
user_id=user_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
async def _create_dev_box_initial(
self, project_name: str, dev_box_name: str, body: Union[JSON, IO], user_id: str = "me", **kwargs: Any
) -> JSON:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[JSON] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(body, (IO, bytes)):
_content = body
else:
_json = body
request = build_dev_boxes_create_dev_box_request(
project_name=project_name,
dev_box_name=dev_box_name,
user_id=user_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {}) # type: ignore
return cast(JSON, deserialized) # type: ignore
@overload
async def begin_create_dev_box(
self,
project_name: str,
dev_box_name: str,
body: JSON,
user_id: str = "me",
*,
content_type: str = "application/json",
**kwargs: Any
) -> AsyncLROPoller[JSON]:
"""Creates or updates a Dev Box.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param body: Represents a environment. Required.
:type body: JSON
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns JSON object
:rtype: ~azure.core.polling.AsyncLROPoller[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"poolName": "str", # The name of the Dev Box pool this machine belongs to.
Required.
"actionState": "str", # Optional. The current action state of the Dev Box.
This is state is based on previous action performed by user.
"createdTime": "2020-02-20 00:00:00", # Optional. Creation time of this Dev
Box.
"errorDetails": {
"code": "str", # Optional. The error code.
"message": "str" # Optional. The error message.
},
"hardwareProfile": {
"memoryGB": 0, # Optional. The amount of memory available for the
Dev Box.
"skuName": "str", # Optional. The name of the SKU.
"vCPUs": 0 # Optional. The number of vCPUs available for the Dev
Box.
},
"hibernateSupport": "str", # Optional. Indicates whether hibernate is
enabled/disabled or unknown. Known values are: "Disabled" and "Enabled".
"imageReference": {
"name": "str", # Optional. The name of the image used.
"operatingSystem": "str", # Optional. The operating system of the
image.
"osBuildNumber": "str", # Optional. The operating system build
number of the image.
"publishedDate": "2020-02-20 00:00:00", # Optional. The datetime
that the backing image version was published.
"version": "str" # Optional. The version of the image.
},
"localAdministrator": "str", # Optional. Indicates whether the owner of the
Dev Box is a local administrator. Known values are: "Enabled" and "Disabled".
"location": "str", # Optional. Azure region where this Dev Box is located.
This will be the same region as the Virtual Network it is attached to.
"name": "str", # Optional. Display name for the Dev Box.
"osType": "str", # Optional. The operating system type of this Dev Box.
"Windows"
"powerState": "str", # Optional. The current power state of the Dev Box.
Known values are: "Unknown", "Deallocated", "PoweredOff", "Running", and
"Hibernated".
"projectName": "str", # Optional. Name of the project this Dev Box belongs
to.
"provisioningState": "str", # Optional. The current provisioning state of
the Dev Box.
"storageProfile": {
"osDisk": {
"diskSizeGB": 0 # Optional. The size of the OS Disk in
gigabytes.
}
},
"uniqueId": "str", # Optional. A unique identifier for the Dev Box. This is
a GUID-formatted string (e.g. 00000000-0000-0000-0000-000000000000).
"user": "str" # Optional. The AAD object id of the user this Dev Box is
assigned to.
}
# response body for status code(s): 200, 201
response == {
"poolName": "str", # The name of the Dev Box pool this machine belongs to.
Required.
"actionState": "str", # Optional. The current action state of the Dev Box.
This is state is based on previous action performed by user.
"createdTime": "2020-02-20 00:00:00", # Optional. Creation time of this Dev
Box.
"errorDetails": {
"code": "str", # Optional. The error code.
"message": "str" # Optional. The error message.
},
"hardwareProfile": {
"memoryGB": 0, # Optional. The amount of memory available for the
Dev Box.
"skuName": "str", # Optional. The name of the SKU.
"vCPUs": 0 # Optional. The number of vCPUs available for the Dev
Box.
},
"hibernateSupport": "str", # Optional. Indicates whether hibernate is
enabled/disabled or unknown. Known values are: "Disabled" and "Enabled".
"imageReference": {
"name": "str", # Optional. The name of the image used.
"operatingSystem": "str", # Optional. The operating system of the
image.
"osBuildNumber": "str", # Optional. The operating system build
number of the image.
"publishedDate": "2020-02-20 00:00:00", # Optional. The datetime
that the backing image version was published.
"version": "str" # Optional. The version of the image.
},
"localAdministrator": "str", # Optional. Indicates whether the owner of the
Dev Box is a local administrator. Known values are: "Enabled" and "Disabled".
"location": "str", # Optional. Azure region where this Dev Box is located.
This will be the same region as the Virtual Network it is attached to.
"name": "str", # Optional. Display name for the Dev Box.
"osType": "str", # Optional. The operating system type of this Dev Box.
"Windows"
"powerState": "str", # Optional. The current power state of the Dev Box.
Known values are: "Unknown", "Deallocated", "PoweredOff", "Running", and
"Hibernated".
"projectName": "str", # Optional. Name of the project this Dev Box belongs
to.
"provisioningState": "str", # Optional. The current provisioning state of
the Dev Box.
"storageProfile": {
"osDisk": {
"diskSizeGB": 0 # Optional. The size of the OS Disk in
gigabytes.
}
},
"uniqueId": "str", # Optional. A unique identifier for the Dev Box. This is
a GUID-formatted string (e.g. 00000000-0000-0000-0000-000000000000).
"user": "str" # Optional. The AAD object id of the user this Dev Box is
assigned to.
}
"""
@overload
async def begin_create_dev_box(
self,
project_name: str,
dev_box_name: str,
body: IO,
user_id: str = "me",
*,
content_type: str = "application/json",
**kwargs: Any
) -> AsyncLROPoller[JSON]:
"""Creates or updates a Dev Box.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param body: Represents a environment. Required.
:type body: IO
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns JSON object
:rtype: ~azure.core.polling.AsyncLROPoller[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200, 201
response == {
"poolName": "str", # The name of the Dev Box pool this machine belongs to.
Required.
"actionState": "str", # Optional. The current action state of the Dev Box.
This is state is based on previous action performed by user.
"createdTime": "2020-02-20 00:00:00", # Optional. Creation time of this Dev
Box.
"errorDetails": {
"code": "str", # Optional. The error code.
"message": "str" # Optional. The error message.
},
"hardwareProfile": {
"memoryGB": 0, # Optional. The amount of memory available for the
Dev Box.
"skuName": "str", # Optional. The name of the SKU.
"vCPUs": 0 # Optional. The number of vCPUs available for the Dev
Box.
},
"hibernateSupport": "str", # Optional. Indicates whether hibernate is
enabled/disabled or unknown. Known values are: "Disabled" and "Enabled".
"imageReference": {
"name": "str", # Optional. The name of the image used.
"operatingSystem": "str", # Optional. The operating system of the
image.
"osBuildNumber": "str", # Optional. The operating system build
number of the image.
"publishedDate": "2020-02-20 00:00:00", # Optional. The datetime
that the backing image version was published.
"version": "str" # Optional. The version of the image.
},
"localAdministrator": "str", # Optional. Indicates whether the owner of the
Dev Box is a local administrator. Known values are: "Enabled" and "Disabled".
"location": "str", # Optional. Azure region where this Dev Box is located.
This will be the same region as the Virtual Network it is attached to.
"name": "str", # Optional. Display name for the Dev Box.
"osType": "str", # Optional. The operating system type of this Dev Box.
"Windows"
"powerState": "str", # Optional. The current power state of the Dev Box.
Known values are: "Unknown", "Deallocated", "PoweredOff", "Running", and
"Hibernated".
"projectName": "str", # Optional. Name of the project this Dev Box belongs
to.
"provisioningState": "str", # Optional. The current provisioning state of
the Dev Box.
"storageProfile": {
"osDisk": {
"diskSizeGB": 0 # Optional. The size of the OS Disk in
gigabytes.
}
},
"uniqueId": "str", # Optional. A unique identifier for the Dev Box. This is
a GUID-formatted string (e.g. 00000000-0000-0000-0000-000000000000).
"user": "str" # Optional. The AAD object id of the user this Dev Box is
assigned to.
}
"""
[docs] @distributed_trace_async
async def begin_create_dev_box(
self, project_name: str, dev_box_name: str, body: Union[JSON, IO], user_id: str = "me", **kwargs: Any
) -> AsyncLROPoller[JSON]:
"""Creates or updates a Dev Box.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param body: Represents a environment. Is either a JSON type or a IO type. Required.
:type body: JSON or IO
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns JSON object
:rtype: ~azure.core.polling.AsyncLROPoller[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"poolName": "str", # The name of the Dev Box pool this machine belongs to.
Required.
"actionState": "str", # Optional. The current action state of the Dev Box.
This is state is based on previous action performed by user.
"createdTime": "2020-02-20 00:00:00", # Optional. Creation time of this Dev
Box.
"errorDetails": {
"code": "str", # Optional. The error code.
"message": "str" # Optional. The error message.
},
"hardwareProfile": {
"memoryGB": 0, # Optional. The amount of memory available for the
Dev Box.
"skuName": "str", # Optional. The name of the SKU.
"vCPUs": 0 # Optional. The number of vCPUs available for the Dev
Box.
},
"hibernateSupport": "str", # Optional. Indicates whether hibernate is
enabled/disabled or unknown. Known values are: "Disabled" and "Enabled".
"imageReference": {
"name": "str", # Optional. The name of the image used.
"operatingSystem": "str", # Optional. The operating system of the
image.
"osBuildNumber": "str", # Optional. The operating system build
number of the image.
"publishedDate": "2020-02-20 00:00:00", # Optional. The datetime
that the backing image version was published.
"version": "str" # Optional. The version of the image.
},
"localAdministrator": "str", # Optional. Indicates whether the owner of the
Dev Box is a local administrator. Known values are: "Enabled" and "Disabled".
"location": "str", # Optional. Azure region where this Dev Box is located.
This will be the same region as the Virtual Network it is attached to.
"name": "str", # Optional. Display name for the Dev Box.
"osType": "str", # Optional. The operating system type of this Dev Box.
"Windows"
"powerState": "str", # Optional. The current power state of the Dev Box.
Known values are: "Unknown", "Deallocated", "PoweredOff", "Running", and
"Hibernated".
"projectName": "str", # Optional. Name of the project this Dev Box belongs
to.
"provisioningState": "str", # Optional. The current provisioning state of
the Dev Box.
"storageProfile": {
"osDisk": {
"diskSizeGB": 0 # Optional. The size of the OS Disk in
gigabytes.
}
},
"uniqueId": "str", # Optional. A unique identifier for the Dev Box. This is
a GUID-formatted string (e.g. 00000000-0000-0000-0000-000000000000).
"user": "str" # Optional. The AAD object id of the user this Dev Box is
assigned to.
}
# response body for status code(s): 200, 201
response == {
"poolName": "str", # The name of the Dev Box pool this machine belongs to.
Required.
"actionState": "str", # Optional. The current action state of the Dev Box.
This is state is based on previous action performed by user.
"createdTime": "2020-02-20 00:00:00", # Optional. Creation time of this Dev
Box.
"errorDetails": {
"code": "str", # Optional. The error code.
"message": "str" # Optional. The error message.
},
"hardwareProfile": {
"memoryGB": 0, # Optional. The amount of memory available for the
Dev Box.
"skuName": "str", # Optional. The name of the SKU.
"vCPUs": 0 # Optional. The number of vCPUs available for the Dev
Box.
},
"hibernateSupport": "str", # Optional. Indicates whether hibernate is
enabled/disabled or unknown. Known values are: "Disabled" and "Enabled".
"imageReference": {
"name": "str", # Optional. The name of the image used.
"operatingSystem": "str", # Optional. The operating system of the
image.
"osBuildNumber": "str", # Optional. The operating system build
number of the image.
"publishedDate": "2020-02-20 00:00:00", # Optional. The datetime
that the backing image version was published.
"version": "str" # Optional. The version of the image.
},
"localAdministrator": "str", # Optional. Indicates whether the owner of the
Dev Box is a local administrator. Known values are: "Enabled" and "Disabled".
"location": "str", # Optional. Azure region where this Dev Box is located.
This will be the same region as the Virtual Network it is attached to.
"name": "str", # Optional. Display name for the Dev Box.
"osType": "str", # Optional. The operating system type of this Dev Box.
"Windows"
"powerState": "str", # Optional. The current power state of the Dev Box.
Known values are: "Unknown", "Deallocated", "PoweredOff", "Running", and
"Hibernated".
"projectName": "str", # Optional. Name of the project this Dev Box belongs
to.
"provisioningState": "str", # Optional. The current provisioning state of
the Dev Box.
"storageProfile": {
"osDisk": {
"diskSizeGB": 0 # Optional. The size of the OS Disk in
gigabytes.
}
},
"uniqueId": "str", # Optional. A unique identifier for the Dev Box. This is
a GUID-formatted string (e.g. 00000000-0000-0000-0000-000000000000).
"user": "str" # Optional. The AAD object id of the user this Dev Box is
assigned to.
}
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[JSON] = kwargs.pop("cls", None)
polling: Union[bool, AsyncPollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = await self._create_dev_box_initial(
project_name=project_name,
dev_box_name=dev_box_name,
body=body,
user_id=user_id,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
response = pipeline_response.http_response
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
if polling is True:
polling_method: AsyncPollingMethod = cast(
AsyncPollingMethod,
AsyncLROBasePolling(
lro_delay,
lro_options={"final-state-via": "original-uri"},
path_format_arguments=path_format_arguments,
**kwargs
),
)
elif polling is False:
polling_method = cast(AsyncPollingMethod, AsyncNoPolling())
else:
polling_method = polling
if cont_token:
return AsyncLROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
async def _delete_dev_box_initial( # pylint: disable=inconsistent-return-statements
self, project_name: str, dev_box_name: str, user_id: str = "me", **kwargs: Any
) -> None:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
request = build_dev_boxes_delete_dev_box_request(
project_name=project_name,
dev_box_name=dev_box_name,
user_id=user_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
response_headers = {}
if response.status_code == 202:
response_headers["Operation-Location"] = self._deserialize(
"str", response.headers.get("Operation-Location")
)
if cls:
return cls(pipeline_response, None, response_headers)
[docs] @distributed_trace_async
async def begin_delete_dev_box(
self, project_name: str, dev_box_name: str, user_id: str = "me", **kwargs: Any
) -> AsyncLROPoller[None]:
"""Deletes a Dev Box.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns None
:rtype: ~azure.core.polling.AsyncLROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, AsyncPollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = await self._delete_dev_box_initial( # type: ignore
project_name=project_name,
dev_box_name=dev_box_name,
user_id=user_id,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {})
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
if polling is True:
polling_method: AsyncPollingMethod = cast(
AsyncPollingMethod,
AsyncLROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs),
)
elif polling is False:
polling_method = cast(AsyncPollingMethod, AsyncNoPolling())
else:
polling_method = polling
if cont_token:
return AsyncLROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
async def _start_dev_box_initial( # pylint: disable=inconsistent-return-statements
self, project_name: str, dev_box_name: str, user_id: str = "me", **kwargs: Any
) -> None:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
request = build_dev_boxes_start_dev_box_request(
project_name=project_name,
dev_box_name=dev_box_name,
user_id=user_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
response_headers = {}
response_headers["Operation-Location"] = self._deserialize("str", response.headers.get("Operation-Location"))
if cls:
return cls(pipeline_response, None, response_headers)
[docs] @distributed_trace_async
async def begin_start_dev_box(
self, project_name: str, dev_box_name: str, user_id: str = "me", **kwargs: Any
) -> AsyncLROPoller[None]:
"""Starts a Dev Box.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns None
:rtype: ~azure.core.polling.AsyncLROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, AsyncPollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = await self._start_dev_box_initial( # type: ignore
project_name=project_name,
dev_box_name=dev_box_name,
user_id=user_id,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {})
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
if polling is True:
polling_method: AsyncPollingMethod = cast(
AsyncPollingMethod,
AsyncLROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs),
)
elif polling is False:
polling_method = cast(AsyncPollingMethod, AsyncNoPolling())
else:
polling_method = polling
if cont_token:
return AsyncLROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
async def _stop_dev_box_initial( # pylint: disable=inconsistent-return-statements
self,
project_name: str,
dev_box_name: str,
user_id: str = "me",
*,
hibernate: Optional[bool] = None,
**kwargs: Any
) -> None:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
request = build_dev_boxes_stop_dev_box_request(
project_name=project_name,
dev_box_name=dev_box_name,
user_id=user_id,
hibernate=hibernate,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
response_headers = {}
response_headers["Operation-Location"] = self._deserialize("str", response.headers.get("Operation-Location"))
if cls:
return cls(pipeline_response, None, response_headers)
[docs] @distributed_trace_async
async def begin_stop_dev_box(
self,
project_name: str,
dev_box_name: str,
user_id: str = "me",
*,
hibernate: Optional[bool] = None,
**kwargs: Any
) -> AsyncLROPoller[None]:
"""Stops a Dev Box.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword hibernate: Optional parameter to hibernate the dev box. Default value is None.
:paramtype hibernate: bool
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns None
:rtype: ~azure.core.polling.AsyncLROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, AsyncPollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = await self._stop_dev_box_initial( # type: ignore
project_name=project_name,
dev_box_name=dev_box_name,
user_id=user_id,
hibernate=hibernate,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {})
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
if polling is True:
polling_method: AsyncPollingMethod = cast(
AsyncPollingMethod,
AsyncLROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs),
)
elif polling is False:
polling_method = cast(AsyncPollingMethod, AsyncNoPolling())
else:
polling_method = polling
if cont_token:
return AsyncLROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs] @distributed_trace_async
async def get_remote_connection(
self, project_name: str, dev_box_name: str, user_id: str = "me", **kwargs: Any
) -> JSON:
"""Gets RDP Connection info.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"rdpConnectionUrl": "str", # Optional. Link to open a Remote Desktop
session.
"webUrl": "str" # Optional. URL to open a browser based RDP session.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_dev_boxes_get_remote_connection_request(
project_name=project_name,
dev_box_name=dev_box_name,
user_id=user_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
[docs] @distributed_trace
def list_upcoming_actions(
self, project_name: str, dev_box_name: str, user_id: str = "me", **kwargs: Any
) -> AsyncIterable[JSON]:
"""Lists upcoming actions on a Dev Box.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"actionType": "str", # Optional. The action that will be taken. "Stop"
"id": "str", # Optional. Uniquely identifies the action.
"originalScheduledTime": "2020-02-20 00:00:00", # Optional. The original
scheduled time for the action (UTC).
"reason": "str", # Optional. The reason for this action. "Schedule"
"scheduledTime": "2020-02-20 00:00:00", # Optional. The target time the
action will be triggered (UTC).
"sourceId": "str" # Optional. The id of the resource which triggered this
action.
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_dev_boxes_list_upcoming_actions_request(
project_name=project_name,
dev_box_name=dev_box_name,
user_id=user_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async
async def get_upcoming_action(
self, project_name: str, dev_box_name: str, upcoming_action_id: str, user_id: str = "me", **kwargs: Any
) -> JSON:
"""Gets an Upcoming Action.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param upcoming_action_id: The upcoming action id. Required.
:type upcoming_action_id: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"actionType": "str", # Optional. The action that will be taken. "Stop"
"id": "str", # Optional. Uniquely identifies the action.
"originalScheduledTime": "2020-02-20 00:00:00", # Optional. The original
scheduled time for the action (UTC).
"reason": "str", # Optional. The reason for this action. "Schedule"
"scheduledTime": "2020-02-20 00:00:00", # Optional. The target time the
action will be triggered (UTC).
"sourceId": "str" # Optional. The id of the resource which triggered this
action.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_dev_boxes_get_upcoming_action_request(
project_name=project_name,
dev_box_name=dev_box_name,
upcoming_action_id=upcoming_action_id,
user_id=user_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
[docs] @distributed_trace_async
async def skip_upcoming_action( # pylint: disable=inconsistent-return-statements
self, project_name: str, dev_box_name: str, upcoming_action_id: str, user_id: str = "me", **kwargs: Any
) -> None:
"""Skips an Upcoming Action.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param upcoming_action_id: The upcoming action id. Required.
:type upcoming_action_id: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
request = build_dev_boxes_skip_upcoming_action_request(
project_name=project_name,
dev_box_name=dev_box_name,
upcoming_action_id=upcoming_action_id,
user_id=user_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {})
[docs] @distributed_trace_async
async def delay_upcoming_action(
self,
project_name: str,
dev_box_name: str,
upcoming_action_id: str,
user_id: str = "me",
*,
delay_until: datetime.datetime,
**kwargs: Any
) -> JSON:
"""Delays an Upcoming Action.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param dev_box_name: The name of a Dev Box. Required.
:type dev_box_name: str
:param upcoming_action_id: The upcoming action id. Required.
:type upcoming_action_id: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword delay_until: The delayed action time (UTC). Required.
:paramtype delay_until: ~datetime.datetime
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"actionType": "str", # Optional. The action that will be taken. "Stop"
"id": "str", # Optional. Uniquely identifies the action.
"originalScheduledTime": "2020-02-20 00:00:00", # Optional. The original
scheduled time for the action (UTC).
"reason": "str", # Optional. The reason for this action. "Schedule"
"scheduledTime": "2020-02-20 00:00:00", # Optional. The target time the
action will be triggered (UTC).
"sourceId": "str" # Optional. The id of the resource which triggered this
action.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_dev_boxes_delay_upcoming_action_request(
project_name=project_name,
dev_box_name=dev_box_name,
upcoming_action_id=upcoming_action_id,
user_id=user_id,
delay_until=delay_until,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
[docs]class EnvironmentsOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.developer.devcenter.aio.DevCenterClient`'s
:attr:`environments` attribute.
"""
def __init__(self, *args, **kwargs) -> None:
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace
def list_environments(self, project_name: str, *, top: Optional[int] = None, **kwargs: Any) -> AsyncIterable[JSON]:
"""Lists the environments for a project.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:keyword top: The maximum number of resources to return from the operation. Example: 'top=10'.
Default value is None.
:paramtype top: int
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"environmentType": "str", # Environment type. Required.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"name": "str", # Optional. Environment name.
"parameters": {}, # Optional. Parameters object for the deploy action.
"provisioningState": "str", # Optional. The provisioning state of the
environment.
"resourceGroupId": "str", # Optional. The identifier of the resource group
containing the environment's resources.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
},
"user": "str" # Optional. The AAD object id of the owner of this
Environment.
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_environments_list_environments_request(
project_name=project_name,
top=top,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace
def list_environments_by_user(
self, project_name: str, user_id: str = "me", *, top: Optional[int] = None, **kwargs: Any
) -> AsyncIterable[JSON]:
"""Lists the environments for a project and user.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword top: The maximum number of resources to return from the operation. Example: 'top=10'.
Default value is None.
:paramtype top: int
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"environmentType": "str", # Environment type. Required.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"name": "str", # Optional. Environment name.
"parameters": {}, # Optional. Parameters object for the deploy action.
"provisioningState": "str", # Optional. The provisioning state of the
environment.
"resourceGroupId": "str", # Optional. The identifier of the resource group
containing the environment's resources.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
},
"user": "str" # Optional. The AAD object id of the owner of this
Environment.
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_environments_list_environments_by_user_request(
project_name=project_name,
user_id=user_id,
top=top,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async
async def get_environment_by_user(
self, project_name: str, environment_name: str, user_id: str = "me", **kwargs: Any
) -> JSON:
"""Gets an environment.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"environmentType": "str", # Environment type. Required.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"name": "str", # Optional. Environment name.
"parameters": {}, # Optional. Parameters object for the deploy action.
"provisioningState": "str", # Optional. The provisioning state of the
environment.
"resourceGroupId": "str", # Optional. The identifier of the resource group
containing the environment's resources.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
},
"user": "str" # Optional. The AAD object id of the owner of this
Environment.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_environments_get_environment_by_user_request(
project_name=project_name,
environment_name=environment_name,
user_id=user_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
async def _create_or_update_environment_initial(
self, project_name: str, environment_name: str, body: Union[JSON, IO], user_id: str = "me", **kwargs: Any
) -> JSON:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[JSON] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(body, (IO, bytes)):
_content = body
else:
_json = body
request = build_environments_create_or_update_environment_request(
project_name=project_name,
environment_name=environment_name,
user_id=user_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
response_headers = {}
if response.status_code == 200:
if response.content:
deserialized = response.json()
else:
deserialized = None
if response.status_code == 201:
response_headers["Operation-Location"] = self._deserialize(
"str", response.headers.get("Operation-Location")
)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), response_headers) # type: ignore
return cast(JSON, deserialized) # type: ignore
@overload
async def begin_create_or_update_environment(
self,
project_name: str,
environment_name: str,
body: JSON,
user_id: str = "me",
*,
content_type: str = "application/json",
**kwargs: Any
) -> AsyncLROPoller[JSON]:
"""Creates or updates an environment.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Represents a environment. Required.
:type body: JSON
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns JSON object
:rtype: ~azure.core.polling.AsyncLROPoller[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"environmentType": "str", # Environment type. Required.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"name": "str", # Optional. Environment name.
"parameters": {}, # Optional. Parameters object for the deploy action.
"provisioningState": "str", # Optional. The provisioning state of the
environment.
"resourceGroupId": "str", # Optional. The identifier of the resource group
containing the environment's resources.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
},
"user": "str" # Optional. The AAD object id of the owner of this
Environment.
}
# response body for status code(s): 200, 201
response == {
"environmentType": "str", # Environment type. Required.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"name": "str", # Optional. Environment name.
"parameters": {}, # Optional. Parameters object for the deploy action.
"provisioningState": "str", # Optional. The provisioning state of the
environment.
"resourceGroupId": "str", # Optional. The identifier of the resource group
containing the environment's resources.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
},
"user": "str" # Optional. The AAD object id of the owner of this
Environment.
}
"""
@overload
async def begin_create_or_update_environment(
self,
project_name: str,
environment_name: str,
body: IO,
user_id: str = "me",
*,
content_type: str = "application/json",
**kwargs: Any
) -> AsyncLROPoller[JSON]:
"""Creates or updates an environment.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Represents a environment. Required.
:type body: IO
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns JSON object
:rtype: ~azure.core.polling.AsyncLROPoller[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200, 201
response == {
"environmentType": "str", # Environment type. Required.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"name": "str", # Optional. Environment name.
"parameters": {}, # Optional. Parameters object for the deploy action.
"provisioningState": "str", # Optional. The provisioning state of the
environment.
"resourceGroupId": "str", # Optional. The identifier of the resource group
containing the environment's resources.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
},
"user": "str" # Optional. The AAD object id of the owner of this
Environment.
}
"""
[docs] @distributed_trace_async
async def begin_create_or_update_environment(
self, project_name: str, environment_name: str, body: Union[JSON, IO], user_id: str = "me", **kwargs: Any
) -> AsyncLROPoller[JSON]:
"""Creates or updates an environment.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Represents a environment. Is either a JSON type or a IO type. Required.
:type body: JSON or IO
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns JSON object
:rtype: ~azure.core.polling.AsyncLROPoller[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"environmentType": "str", # Environment type. Required.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"name": "str", # Optional. Environment name.
"parameters": {}, # Optional. Parameters object for the deploy action.
"provisioningState": "str", # Optional. The provisioning state of the
environment.
"resourceGroupId": "str", # Optional. The identifier of the resource group
containing the environment's resources.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
},
"user": "str" # Optional. The AAD object id of the owner of this
Environment.
}
# response body for status code(s): 200, 201
response == {
"environmentType": "str", # Environment type. Required.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"name": "str", # Optional. Environment name.
"parameters": {}, # Optional. Parameters object for the deploy action.
"provisioningState": "str", # Optional. The provisioning state of the
environment.
"resourceGroupId": "str", # Optional. The identifier of the resource group
containing the environment's resources.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
},
"user": "str" # Optional. The AAD object id of the owner of this
Environment.
}
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[JSON] = kwargs.pop("cls", None)
polling: Union[bool, AsyncPollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = await self._create_or_update_environment_initial(
project_name=project_name,
environment_name=environment_name,
body=body,
user_id=user_id,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
response = pipeline_response.http_response
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
if polling is True:
polling_method: AsyncPollingMethod = cast(
AsyncPollingMethod,
AsyncLROBasePolling(
lro_delay,
lro_options={"final-state-via": "original-uri"},
path_format_arguments=path_format_arguments,
**kwargs
),
)
elif polling is False:
polling_method = cast(AsyncPollingMethod, AsyncNoPolling())
else:
polling_method = polling
if cont_token:
return AsyncLROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
@overload
async def update_environment(
self,
project_name: str,
environment_name: str,
body: JSON,
user_id: str = "me",
*,
content_type: str = "application/merge-patch+json",
**kwargs: Any
) -> JSON:
"""Partially updates an environment.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Updatable environment properties. Required.
:type body: JSON
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/merge-patch+json".
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"parameters": {}, # Optional. Parameters object for the deploy action.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
}
}
# response body for status code(s): 200
response == {
"environmentType": "str", # Environment type. Required.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"name": "str", # Optional. Environment name.
"parameters": {}, # Optional. Parameters object for the deploy action.
"provisioningState": "str", # Optional. The provisioning state of the
environment.
"resourceGroupId": "str", # Optional. The identifier of the resource group
containing the environment's resources.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
},
"user": "str" # Optional. The AAD object id of the owner of this
Environment.
}
"""
@overload
async def update_environment(
self,
project_name: str,
environment_name: str,
body: IO,
user_id: str = "me",
*,
content_type: str = "application/merge-patch+json",
**kwargs: Any
) -> JSON:
"""Partially updates an environment.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Updatable environment properties. Required.
:type body: IO
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/merge-patch+json".
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"environmentType": "str", # Environment type. Required.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"name": "str", # Optional. Environment name.
"parameters": {}, # Optional. Parameters object for the deploy action.
"provisioningState": "str", # Optional. The provisioning state of the
environment.
"resourceGroupId": "str", # Optional. The identifier of the resource group
containing the environment's resources.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
},
"user": "str" # Optional. The AAD object id of the owner of this
Environment.
}
"""
[docs] @distributed_trace_async
async def update_environment(
self, project_name: str, environment_name: str, body: Union[JSON, IO], user_id: str = "me", **kwargs: Any
) -> JSON:
"""Partially updates an environment.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Updatable environment properties. Is either a JSON type or a IO type. Required.
:type body: JSON or IO
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Known values are:
'application/merge-patch+json'. Default value is None.
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"parameters": {}, # Optional. Parameters object for the deploy action.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
}
}
# response body for status code(s): 200
response == {
"environmentType": "str", # Environment type. Required.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. Description of the Environment.
"name": "str", # Optional. Environment name.
"parameters": {}, # Optional. Parameters object for the deploy action.
"provisioningState": "str", # Optional. The provisioning state of the
environment.
"resourceGroupId": "str", # Optional. The identifier of the resource group
containing the environment's resources.
"scheduledTasks": {
"str": {
"startTime": "2020-02-20 00:00:00", # Date/time by which the
environment should expire. Required.
"type": "str", # Supported type this scheduled task
represents. Required. "AutoExpire"
"enabled": "str" # Optional. Indicates whether or not this
scheduled task is enabled. Known values are: "Enabled" and "Disabled".
}
},
"tags": {
"str": "str" # Optional. Key value pairs that will be applied to
resources deployed in this environment as tags.
},
"user": "str" # Optional. The AAD object id of the owner of this
Environment.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[JSON] = kwargs.pop("cls", None)
content_type = content_type or "application/merge-patch+json"
_json = None
_content = None
if isinstance(body, (IO, bytes)):
_content = body
else:
_json = body
request = build_environments_update_environment_request(
project_name=project_name,
environment_name=environment_name,
user_id=user_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
async def _delete_environment_initial( # pylint: disable=inconsistent-return-statements
self, project_name: str, environment_name: str, user_id: str = "me", **kwargs: Any
) -> None:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
request = build_environments_delete_environment_request(
project_name=project_name,
environment_name=environment_name,
user_id=user_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
response_headers = {}
if response.status_code == 202:
response_headers["Operation-Location"] = self._deserialize(
"str", response.headers.get("Operation-Location")
)
if cls:
return cls(pipeline_response, None, response_headers)
[docs] @distributed_trace_async
async def begin_delete_environment(
self, project_name: str, environment_name: str, user_id: str = "me", **kwargs: Any
) -> AsyncLROPoller[None]:
"""Deletes an environment and all its associated resources.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns None
:rtype: ~azure.core.polling.AsyncLROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, AsyncPollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = await self._delete_environment_initial( # type: ignore
project_name=project_name,
environment_name=environment_name,
user_id=user_id,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {})
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
if polling is True:
polling_method: AsyncPollingMethod = cast(
AsyncPollingMethod,
AsyncLROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs),
)
elif polling is False:
polling_method = cast(AsyncPollingMethod, AsyncNoPolling())
else:
polling_method = polling
if cont_token:
return AsyncLROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
async def _deploy_environment_action_initial( # pylint: disable=inconsistent-return-statements
self, project_name: str, environment_name: str, body: Union[JSON, IO], user_id: str = "me", **kwargs: Any
) -> None:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(body, (IO, bytes)):
_content = body
else:
_json = body
request = build_environments_deploy_environment_action_request(
project_name=project_name,
environment_name=environment_name,
user_id=user_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
response_headers = {}
if response.status_code == 202:
response_headers["Operation-Location"] = self._deserialize(
"str", response.headers.get("Operation-Location")
)
if cls:
return cls(pipeline_response, None, response_headers)
@overload
async def begin_deploy_environment_action(
self,
project_name: str,
environment_name: str,
body: JSON,
user_id: str = "me",
*,
content_type: str = "application/json",
**kwargs: Any
) -> AsyncLROPoller[None]:
"""Executes a deploy action.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Action properties overriding the environment's default values. Required.
:type body: JSON
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns None
:rtype: ~azure.core.polling.AsyncLROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"actionId": "str", # The Catalog Item action id to execute. Required.
"parameters": {} # Optional. Parameters object for the Action.
}
"""
@overload
async def begin_deploy_environment_action(
self,
project_name: str,
environment_name: str,
body: IO,
user_id: str = "me",
*,
content_type: str = "application/json",
**kwargs: Any
) -> AsyncLROPoller[None]:
"""Executes a deploy action.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Action properties overriding the environment's default values. Required.
:type body: IO
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns None
:rtype: ~azure.core.polling.AsyncLROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace_async
async def begin_deploy_environment_action(
self, project_name: str, environment_name: str, body: Union[JSON, IO], user_id: str = "me", **kwargs: Any
) -> AsyncLROPoller[None]:
"""Executes a deploy action.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Action properties overriding the environment's default values. Is either a JSON
type or a IO type. Required.
:type body: JSON or IO
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns None
:rtype: ~azure.core.polling.AsyncLROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"actionId": "str", # The Catalog Item action id to execute. Required.
"parameters": {} # Optional. Parameters object for the Action.
}
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, AsyncPollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = await self._deploy_environment_action_initial( # type: ignore
project_name=project_name,
environment_name=environment_name,
body=body,
user_id=user_id,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {})
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
if polling is True:
polling_method: AsyncPollingMethod = cast(
AsyncPollingMethod,
AsyncLROBasePolling(
lro_delay,
lro_options={"final-state-via": "original-uri"},
path_format_arguments=path_format_arguments,
**kwargs
),
)
elif polling is False:
polling_method = cast(AsyncPollingMethod, AsyncNoPolling())
else:
polling_method = polling
if cont_token:
return AsyncLROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
async def _custom_environment_action_initial( # pylint: disable=inconsistent-return-statements
self, project_name: str, environment_name: str, body: Union[JSON, IO], user_id: str = "me", **kwargs: Any
) -> None:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(body, (IO, bytes)):
_content = body
else:
_json = body
request = build_environments_custom_environment_action_request(
project_name=project_name,
environment_name=environment_name,
user_id=user_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
response_headers = {}
if response.status_code == 202:
response_headers["Operation-Location"] = self._deserialize(
"str", response.headers.get("Operation-Location")
)
if cls:
return cls(pipeline_response, None, response_headers)
@overload
async def begin_custom_environment_action(
self,
project_name: str,
environment_name: str,
body: JSON,
user_id: str = "me",
*,
content_type: str = "application/json",
**kwargs: Any
) -> AsyncLROPoller[None]:
"""Executes a custom action.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Action properties overriding the environment's default values. Required.
:type body: JSON
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns None
:rtype: ~azure.core.polling.AsyncLROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"actionId": "str", # The Catalog Item action id to execute. Required.
"parameters": {} # Optional. Parameters object for the Action.
}
"""
@overload
async def begin_custom_environment_action(
self,
project_name: str,
environment_name: str,
body: IO,
user_id: str = "me",
*,
content_type: str = "application/json",
**kwargs: Any
) -> AsyncLROPoller[None]:
"""Executes a custom action.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Action properties overriding the environment's default values. Required.
:type body: IO
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns None
:rtype: ~azure.core.polling.AsyncLROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace_async
async def begin_custom_environment_action(
self, project_name: str, environment_name: str, body: Union[JSON, IO], user_id: str = "me", **kwargs: Any
) -> AsyncLROPoller[None]:
"""Executes a custom action.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param environment_name: The name of the environment. Required.
:type environment_name: str
:param body: Action properties overriding the environment's default values. Is either a JSON
type or a IO type. Required.
:type body: JSON or IO
:param user_id: The AAD object id of the user. If value is 'me', the identity is taken from the
authentication context. Default value is "me".
:type user_id: str
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:keyword str continuation_token: A continuation token to restart a poller from a saved state.
:keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False
for this operation to not poll, or pass in your own initialized polling object for a personal
polling strategy.
:paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
:return: An instance of AsyncLROPoller that returns None
:rtype: ~azure.core.polling.AsyncLROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
body = {
"actionId": "str", # The Catalog Item action id to execute. Required.
"parameters": {} # Optional. Parameters object for the Action.
}
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, AsyncPollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = await self._custom_environment_action_initial( # type: ignore
project_name=project_name,
environment_name=environment_name,
body=body,
user_id=user_id,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {})
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
if polling is True:
polling_method: AsyncPollingMethod = cast(
AsyncPollingMethod,
AsyncLROBasePolling(
lro_delay,
lro_options={"final-state-via": "original-uri"},
path_format_arguments=path_format_arguments,
**kwargs
),
)
elif polling is False:
polling_method = cast(AsyncPollingMethod, AsyncNoPolling())
else:
polling_method = polling
if cont_token:
return AsyncLROPoller.from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs] @distributed_trace
def list_catalog_items(self, project_name: str, *, top: Optional[int] = None, **kwargs: Any) -> AsyncIterable[JSON]:
"""Lists latest version of all catalog items available for a project.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:keyword top: The maximum number of resources to return from the operation. Example: 'top=10'.
Default value is None.
:paramtype top: int
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"catalogName": "str", # Optional. Name of the catalog.
"id": "str", # Optional. Unique identifier of the catalog item.
"name": "str" # Optional. Name of the catalog item.
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_environments_list_catalog_items_request(
project_name=project_name,
top=top,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async
async def get_catalog_item(self, project_name: str, catalog_item_id: str, **kwargs: Any) -> JSON:
"""Get a catalog item from a project.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param catalog_item_id: The unique id of the catalog item. Required.
:type catalog_item_id: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"catalogName": "str", # Optional. Name of the catalog.
"id": "str", # Optional. Unique identifier of the catalog item.
"name": "str" # Optional. Name of the catalog item.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_environments_get_catalog_item_request(
project_name=project_name,
catalog_item_id=catalog_item_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
[docs] @distributed_trace
def list_catalog_item_versions(
self, project_name: str, catalog_item_id: str, *, top: Optional[int] = None, **kwargs: Any
) -> AsyncIterable[JSON]:
"""List all versions of a catalog item from a project.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param catalog_item_id: The unique id of the catalog item. Required.
:type catalog_item_id: str
:keyword top: The maximum number of resources to return from the operation. Example: 'top=10'.
Default value is None.
:paramtype top: int
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"actions": [
{
"description": "str", # Optional. Description of the action.
"id": "str", # Optional. Unique identifier of the action.
"name": "str", # Optional. Display name of the action.
"parameters": [
{
"allowed": [
{} # Optional. An array of allowed
values.
],
"default": {}, # Optional. Default value of
the parameter.
"description": "str", # Optional.
Description of the parameter.
"id": "str", # Optional. Unique ID of the
parameter.
"name": "str", # Optional. Display name of
the parameter.
"readOnly": bool, # Optional. Whether or not
this parameter is read-only. If true, default should have a
value.
"required": bool, # Optional. Whether or not
this parameter is required.
"type": "str" # Optional. A string of one of
the basic JSON types (number, integer, null, array, object,
boolean, string). Known values are: "array", "boolean",
"integer", "null", "number", "object", and "string".
}
],
"parametersSchema": "str", # Optional. JSON schema defining
the parameters specific to the custom action.
"runner": "str", # Optional. The container image to use to
execute the action.
"type": "str", # Optional. The action type. Known values
are: "Custom", "Deploy", and "Delete".
"typeName": "str" # Optional. Name of the custom action
type.
}
],
"catalogItemId": "str", # Optional. Unique identifier of the catalog item.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. A long description of the catalog item.
"eligibleForLatestVersion": bool, # Optional. Whether the version is
eligible to be the latest version.
"parameters": [
{
"allowed": [
{} # Optional. An array of allowed values.
],
"default": {}, # Optional. Default value of the parameter.
"description": "str", # Optional. Description of the
parameter.
"id": "str", # Optional. Unique ID of the parameter.
"name": "str", # Optional. Display name of the parameter.
"readOnly": bool, # Optional. Whether or not this parameter
is read-only. If true, default should have a value.
"required": bool, # Optional. Whether or not this parameter
is required.
"type": "str" # Optional. A string of one of the basic JSON
types (number, integer, null, array, object, boolean, string). Known
values are: "array", "boolean", "integer", "null", "number", "object",
and "string".
}
],
"parametersSchema": "str", # Optional. JSON schema defining the parameters
object passed to actions.
"runner": "str", # Optional. The default container image to use to execute
actions.
"status": "str", # Optional. Defines whether the specific catalog item
version can be used. Known values are: "Enabled" and "Disabled".
"summary": "str", # Optional. A short summary of the catalog item.
"templatePath": "str", # Optional. Path to the catalog item entrypoint file.
"version": "str" # Optional. The version of the catalog item.
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_environments_list_catalog_item_versions_request(
project_name=project_name,
catalog_item_id=catalog_item_id,
top=top,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async
async def get_catalog_item_version(
self, project_name: str, catalog_item_id: str, version: str, **kwargs: Any
) -> JSON:
"""Get a specific catalog item version from a project.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:param catalog_item_id: The unique id of the catalog item. Required.
:type catalog_item_id: str
:param version: The version of the catalog item. Required.
:type version: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"actions": [
{
"description": "str", # Optional. Description of the action.
"id": "str", # Optional. Unique identifier of the action.
"name": "str", # Optional. Display name of the action.
"parameters": [
{
"allowed": [
{} # Optional. An array of allowed
values.
],
"default": {}, # Optional. Default value of
the parameter.
"description": "str", # Optional.
Description of the parameter.
"id": "str", # Optional. Unique ID of the
parameter.
"name": "str", # Optional. Display name of
the parameter.
"readOnly": bool, # Optional. Whether or not
this parameter is read-only. If true, default should have a
value.
"required": bool, # Optional. Whether or not
this parameter is required.
"type": "str" # Optional. A string of one of
the basic JSON types (number, integer, null, array, object,
boolean, string). Known values are: "array", "boolean",
"integer", "null", "number", "object", and "string".
}
],
"parametersSchema": "str", # Optional. JSON schema defining
the parameters specific to the custom action.
"runner": "str", # Optional. The container image to use to
execute the action.
"type": "str", # Optional. The action type. Known values
are: "Custom", "Deploy", and "Delete".
"typeName": "str" # Optional. Name of the custom action
type.
}
],
"catalogItemId": "str", # Optional. Unique identifier of the catalog item.
"catalogItemName": "str", # Optional. Name of the catalog item.
"catalogName": "str", # Optional. Name of the catalog.
"description": "str", # Optional. A long description of the catalog item.
"eligibleForLatestVersion": bool, # Optional. Whether the version is
eligible to be the latest version.
"parameters": [
{
"allowed": [
{} # Optional. An array of allowed values.
],
"default": {}, # Optional. Default value of the parameter.
"description": "str", # Optional. Description of the
parameter.
"id": "str", # Optional. Unique ID of the parameter.
"name": "str", # Optional. Display name of the parameter.
"readOnly": bool, # Optional. Whether or not this parameter
is read-only. If true, default should have a value.
"required": bool, # Optional. Whether or not this parameter
is required.
"type": "str" # Optional. A string of one of the basic JSON
types (number, integer, null, array, object, boolean, string). Known
values are: "array", "boolean", "integer", "null", "number", "object",
and "string".
}
],
"parametersSchema": "str", # Optional. JSON schema defining the parameters
object passed to actions.
"runner": "str", # Optional. The default container image to use to execute
actions.
"status": "str", # Optional. Defines whether the specific catalog item
version can be used. Known values are: "Enabled" and "Disabled".
"summary": "str", # Optional. A short summary of the catalog item.
"templatePath": "str", # Optional. Path to the catalog item entrypoint file.
"version": "str" # Optional. The version of the catalog item.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_environments_get_catalog_item_version_request(
project_name=project_name,
catalog_item_id=catalog_item_id,
version=version,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
[docs] @distributed_trace
def list_environment_types(
self, project_name: str, *, top: Optional[int] = None, **kwargs: Any
) -> AsyncIterable[JSON]:
"""Lists all environment types configured for a project.
:param project_name: The DevCenter Project upon which to execute operations. Required.
:type project_name: str
:keyword top: The maximum number of resources to return from the operation. Example: 'top=10'.
Default value is None.
:paramtype top: int
:return: An iterator like instance of JSON object
:rtype: ~azure.core.async_paging.AsyncItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"deploymentTargetId": "str", # Optional. Id of a subscription or management
group that the environment type will be mapped to. The environment's resources
will be deployed into this subscription or management group.
"name": "str", # Optional. Name of the environment type.
"status": "str" # Optional. Defines whether this Environment Type can be
used in this Project. Known values are: "Enabled" and "Disabled".
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_environments_list_environment_types_request(
project_name=project_name,
top=top,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
async def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=False, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return AsyncItemPaged(get_next, extract_data)