Source code for azure.mgmt.iothubprovisioningservices.operations._dps_certificate_operations

# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import datetime
import sys
from typing import Any, Callable, Dict, IO, Optional, TypeVar, Union, overload

from azure.core.exceptions import (
    ClientAuthenticationError,
    HttpResponseError,
    ResourceExistsError,
    ResourceNotFoundError,
    ResourceNotModifiedError,
    map_error,
)
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import HttpResponse
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat

from .. import models as _models
from .._serialization import Serializer
from .._vendor import _convert_request, _format_url_section

if sys.version_info >= (3, 8):
    from typing import Literal  # pylint: disable=no-name-in-module, ungrouped-imports
else:
    from typing_extensions import Literal  # type: ignore  # pylint: disable=ungrouped-imports
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]

_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False


def build_get_request(
    certificate_name: str,
    resource_group_name: str,
    provisioning_service_name: str,
    subscription_id: str,
    *,
    if_match: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version = kwargs.pop("api_version", _params.pop("api-version", "2022-02-05"))  # type: Literal["2022-02-05"]
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates/{certificateName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "certificateName": _SERIALIZER.url("certificate_name", certificate_name, "str"),
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
        "provisioningServiceName": _SERIALIZER.url("provisioning_service_name", provisioning_service_name, "str"),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if if_match is not None:
        _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_create_or_update_request(
    resource_group_name: str,
    provisioning_service_name: str,
    certificate_name: str,
    subscription_id: str,
    *,
    if_match: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version = kwargs.pop("api_version", _params.pop("api-version", "2022-02-05"))  # type: Literal["2022-02-05"]
    content_type = kwargs.pop("content_type", _headers.pop("Content-Type", None))  # type: Optional[str]
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates/{certificateName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
        "provisioningServiceName": _SERIALIZER.url("provisioning_service_name", provisioning_service_name, "str"),
        "certificateName": _SERIALIZER.url("certificate_name", certificate_name, "str", max_length=256),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if if_match is not None:
        _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str")
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_delete_request(
    resource_group_name: str,
    provisioning_service_name: str,
    certificate_name: str,
    subscription_id: str,
    *,
    if_match: str,
    certificate_name1: Optional[str] = None,
    certificate_raw_bytes: Optional[bytes] = None,
    certificate_is_verified: Optional[bool] = None,
    certificate_purpose: Optional[Union[str, _models.CertificatePurpose]] = None,
    certificate_created: Optional[datetime.datetime] = None,
    certificate_last_updated: Optional[datetime.datetime] = None,
    certificate_has_private_key: Optional[bool] = None,
    certificate_nonce: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version = kwargs.pop("api_version", _params.pop("api-version", "2022-02-05"))  # type: Literal["2022-02-05"]
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates/{certificateName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
        "provisioningServiceName": _SERIALIZER.url("provisioning_service_name", provisioning_service_name, "str"),
        "certificateName": _SERIALIZER.url("certificate_name", certificate_name, "str"),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    if certificate_name1 is not None:
        _params["certificate.name"] = _SERIALIZER.query("certificate_name1", certificate_name1, "str")
    if certificate_raw_bytes is not None:
        _params["certificate.rawBytes"] = _SERIALIZER.query("certificate_raw_bytes", certificate_raw_bytes, "bytearray")
    if certificate_is_verified is not None:
        _params["certificate.isVerified"] = _SERIALIZER.query(
            "certificate_is_verified", certificate_is_verified, "bool"
        )
    if certificate_purpose is not None:
        _params["certificate.purpose"] = _SERIALIZER.query("certificate_purpose", certificate_purpose, "str")
    if certificate_created is not None:
        _params["certificate.created"] = _SERIALIZER.query("certificate_created", certificate_created, "iso-8601")
    if certificate_last_updated is not None:
        _params["certificate.lastUpdated"] = _SERIALIZER.query(
            "certificate_last_updated", certificate_last_updated, "iso-8601"
        )
    if certificate_has_private_key is not None:
        _params["certificate.hasPrivateKey"] = _SERIALIZER.query(
            "certificate_has_private_key", certificate_has_private_key, "bool"
        )
    if certificate_nonce is not None:
        _params["certificate.nonce"] = _SERIALIZER.query("certificate_nonce", certificate_nonce, "str")
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_request(
    resource_group_name: str, provisioning_service_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version = kwargs.pop("api_version", _params.pop("api-version", "2022-02-05"))  # type: Literal["2022-02-05"]
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
        "provisioningServiceName": _SERIALIZER.url("provisioning_service_name", provisioning_service_name, "str"),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_generate_verification_code_request(
    certificate_name: str,
    resource_group_name: str,
    provisioning_service_name: str,
    subscription_id: str,
    *,
    if_match: str,
    certificate_name1: Optional[str] = None,
    certificate_raw_bytes: Optional[bytes] = None,
    certificate_is_verified: Optional[bool] = None,
    certificate_purpose: Optional[Union[str, _models.CertificatePurpose]] = None,
    certificate_created: Optional[datetime.datetime] = None,
    certificate_last_updated: Optional[datetime.datetime] = None,
    certificate_has_private_key: Optional[bool] = None,
    certificate_nonce: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version = kwargs.pop("api_version", _params.pop("api-version", "2022-02-05"))  # type: Literal["2022-02-05"]
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates/{certificateName}/generateVerificationCode",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "certificateName": _SERIALIZER.url("certificate_name", certificate_name, "str"),
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
        "provisioningServiceName": _SERIALIZER.url("provisioning_service_name", provisioning_service_name, "str"),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    if certificate_name1 is not None:
        _params["certificate.name"] = _SERIALIZER.query("certificate_name1", certificate_name1, "str")
    if certificate_raw_bytes is not None:
        _params["certificate.rawBytes"] = _SERIALIZER.query("certificate_raw_bytes", certificate_raw_bytes, "bytearray")
    if certificate_is_verified is not None:
        _params["certificate.isVerified"] = _SERIALIZER.query(
            "certificate_is_verified", certificate_is_verified, "bool"
        )
    if certificate_purpose is not None:
        _params["certificate.purpose"] = _SERIALIZER.query("certificate_purpose", certificate_purpose, "str")
    if certificate_created is not None:
        _params["certificate.created"] = _SERIALIZER.query("certificate_created", certificate_created, "iso-8601")
    if certificate_last_updated is not None:
        _params["certificate.lastUpdated"] = _SERIALIZER.query(
            "certificate_last_updated", certificate_last_updated, "iso-8601"
        )
    if certificate_has_private_key is not None:
        _params["certificate.hasPrivateKey"] = _SERIALIZER.query(
            "certificate_has_private_key", certificate_has_private_key, "bool"
        )
    if certificate_nonce is not None:
        _params["certificate.nonce"] = _SERIALIZER.query("certificate_nonce", certificate_nonce, "str")
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_verify_certificate_request(
    certificate_name: str,
    resource_group_name: str,
    provisioning_service_name: str,
    subscription_id: str,
    *,
    if_match: str,
    certificate_name1: Optional[str] = None,
    certificate_raw_bytes: Optional[bytes] = None,
    certificate_is_verified: Optional[bool] = None,
    certificate_purpose: Optional[Union[str, _models.CertificatePurpose]] = None,
    certificate_created: Optional[datetime.datetime] = None,
    certificate_last_updated: Optional[datetime.datetime] = None,
    certificate_has_private_key: Optional[bool] = None,
    certificate_nonce: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version = kwargs.pop("api_version", _params.pop("api-version", "2022-02-05"))  # type: Literal["2022-02-05"]
    content_type = kwargs.pop("content_type", _headers.pop("Content-Type", None))  # type: Optional[str]
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates/{certificateName}/verify",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "certificateName": _SERIALIZER.url("certificate_name", certificate_name, "str"),
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
        "provisioningServiceName": _SERIALIZER.url("provisioning_service_name", provisioning_service_name, "str"),
    }

    _url = _format_url_section(_url, **path_format_arguments)

    # Construct parameters
    if certificate_name1 is not None:
        _params["certificate.name"] = _SERIALIZER.query("certificate_name1", certificate_name1, "str")
    if certificate_raw_bytes is not None:
        _params["certificate.rawBytes"] = _SERIALIZER.query("certificate_raw_bytes", certificate_raw_bytes, "bytearray")
    if certificate_is_verified is not None:
        _params["certificate.isVerified"] = _SERIALIZER.query(
            "certificate_is_verified", certificate_is_verified, "bool"
        )
    if certificate_purpose is not None:
        _params["certificate.purpose"] = _SERIALIZER.query("certificate_purpose", certificate_purpose, "str")
    if certificate_created is not None:
        _params["certificate.created"] = _SERIALIZER.query("certificate_created", certificate_created, "iso-8601")
    if certificate_last_updated is not None:
        _params["certificate.lastUpdated"] = _SERIALIZER.query(
            "certificate_last_updated", certificate_last_updated, "iso-8601"
        )
    if certificate_has_private_key is not None:
        _params["certificate.hasPrivateKey"] = _SERIALIZER.query(
            "certificate_has_private_key", certificate_has_private_key, "bool"
        )
    if certificate_nonce is not None:
        _params["certificate.nonce"] = _SERIALIZER.query("certificate_nonce", certificate_nonce, "str")
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str")
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


[docs]class DpsCertificateOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.mgmt.iothubprovisioningservices.IotDpsClient`'s :attr:`dps_certificate` attribute. """ models = _models def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def get( self, certificate_name: str, resource_group_name: str, provisioning_service_name: str, if_match: Optional[str] = None, **kwargs: Any ) -> _models.CertificateResponse: """Get the certificate from the provisioning service. :param certificate_name: Name of the certificate to retrieve. Required. :type certificate_name: str :param resource_group_name: Resource group identifier. Required. :type resource_group_name: str :param provisioning_service_name: Name of the provisioning service the certificate is associated with. Required. :type provisioning_service_name: str :param if_match: ETag of the certificate. Default value is None. :type if_match: str :keyword callable cls: A custom type or function that will be passed the direct response :return: CertificateResponse or the result of cls(response) :rtype: ~azure.mgmt.iothubprovisioningservices.models.CertificateResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) # type: Literal["2022-02-05"] cls = kwargs.pop("cls", None) # type: ClsType[_models.CertificateResponse] request = build_get_request( certificate_name=certificate_name, resource_group_name=resource_group_name, provisioning_service_name=provisioning_service_name, subscription_id=self._config.subscription_id, if_match=if_match, api_version=api_version, template_url=self.get.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) # type: ignore pipeline_response = self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorDetails, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("CertificateResponse", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
get.metadata = {"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates/{certificateName}"} # type: ignore @overload def create_or_update( self, resource_group_name: str, provisioning_service_name: str, certificate_name: str, certificate_description: _models.CertificateResponse, if_match: Optional[str] = None, *, content_type: str = "application/json", **kwargs: Any ) -> _models.CertificateResponse: """Upload the certificate to the provisioning service. Add new certificate or update an existing certificate. :param resource_group_name: Resource group identifier. Required. :type resource_group_name: str :param provisioning_service_name: The name of the provisioning service. Required. :type provisioning_service_name: str :param certificate_name: The name of the certificate create or update. Required. :type certificate_name: str :param certificate_description: The certificate body. Required. :type certificate_description: ~azure.mgmt.iothubprovisioningservices.models.CertificateResponse :param if_match: ETag of the certificate. This is required to update an existing certificate, and ignored while creating a brand new certificate. Default value is None. :type if_match: str :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: CertificateResponse or the result of cls(response) :rtype: ~azure.mgmt.iothubprovisioningservices.models.CertificateResponse :raises ~azure.core.exceptions.HttpResponseError: """ @overload def create_or_update( self, resource_group_name: str, provisioning_service_name: str, certificate_name: str, certificate_description: IO, if_match: Optional[str] = None, *, content_type: str = "application/json", **kwargs: Any ) -> _models.CertificateResponse: """Upload the certificate to the provisioning service. Add new certificate or update an existing certificate. :param resource_group_name: Resource group identifier. Required. :type resource_group_name: str :param provisioning_service_name: The name of the provisioning service. Required. :type provisioning_service_name: str :param certificate_name: The name of the certificate create or update. Required. :type certificate_name: str :param certificate_description: The certificate body. Required. :type certificate_description: IO :param if_match: ETag of the certificate. This is required to update an existing certificate, and ignored while creating a brand new certificate. Default value is None. :type if_match: str :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: CertificateResponse or the result of cls(response) :rtype: ~azure.mgmt.iothubprovisioningservices.models.CertificateResponse :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def create_or_update( self, resource_group_name: str, provisioning_service_name: str, certificate_name: str, certificate_description: Union[_models.CertificateResponse, IO], if_match: Optional[str] = None, **kwargs: Any ) -> _models.CertificateResponse: """Upload the certificate to the provisioning service. Add new certificate or update an existing certificate. :param resource_group_name: Resource group identifier. Required. :type resource_group_name: str :param provisioning_service_name: The name of the provisioning service. Required. :type provisioning_service_name: str :param certificate_name: The name of the certificate create or update. Required. :type certificate_name: str :param certificate_description: The certificate body. Is either a model type or a IO type. Required. :type certificate_description: ~azure.mgmt.iothubprovisioningservices.models.CertificateResponse or IO :param if_match: ETag of the certificate. This is required to update an existing certificate, and ignored while creating a brand new certificate. Default value is None. :type if_match: str :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: CertificateResponse or the result of cls(response) :rtype: ~azure.mgmt.iothubprovisioningservices.models.CertificateResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) # type: Literal["2022-02-05"] content_type = kwargs.pop("content_type", _headers.pop("Content-Type", None)) # type: Optional[str] cls = kwargs.pop("cls", None) # type: ClsType[_models.CertificateResponse] content_type = content_type or "application/json" _json = None _content = None if isinstance(certificate_description, (IO, bytes)): _content = certificate_description else: _json = self._serialize.body(certificate_description, "CertificateResponse") request = build_create_or_update_request( resource_group_name=resource_group_name, provisioning_service_name=provisioning_service_name, certificate_name=certificate_name, subscription_id=self._config.subscription_id, if_match=if_match, api_version=api_version, content_type=content_type, json=_json, content=_content, template_url=self.create_or_update.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) # type: ignore pipeline_response = self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorDetails, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("CertificateResponse", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
create_or_update.metadata = {"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates/{certificateName}"} # type: ignore
[docs] @distributed_trace def delete( # pylint: disable=inconsistent-return-statements self, resource_group_name: str, if_match: str, provisioning_service_name: str, certificate_name: str, certificate_name1: Optional[str] = None, certificate_raw_bytes: Optional[bytes] = None, certificate_is_verified: Optional[bool] = None, certificate_purpose: Optional[Union[str, _models.CertificatePurpose]] = None, certificate_created: Optional[datetime.datetime] = None, certificate_last_updated: Optional[datetime.datetime] = None, certificate_has_private_key: Optional[bool] = None, certificate_nonce: Optional[str] = None, **kwargs: Any ) -> None: """Delete the Provisioning Service Certificate. Deletes the specified certificate associated with the Provisioning Service. :param resource_group_name: Resource group identifier. Required. :type resource_group_name: str :param if_match: ETag of the certificate. Required. :type if_match: str :param provisioning_service_name: The name of the provisioning service. Required. :type provisioning_service_name: str :param certificate_name: This is a mandatory field, and is the logical name of the certificate that the provisioning service will access by. Required. :type certificate_name: str :param certificate_name1: This is optional, and it is the Common Name of the certificate. Default value is None. :type certificate_name1: str :param certificate_raw_bytes: Raw data within the certificate. Default value is None. :type certificate_raw_bytes: bytes :param certificate_is_verified: Indicates if certificate has been verified by owner of the private key. Default value is None. :type certificate_is_verified: bool :param certificate_purpose: A description that mentions the purpose of the certificate. Known values are: "clientAuthentication" and "serverAuthentication". Default value is None. :type certificate_purpose: str or ~azure.mgmt.iothubprovisioningservices.models.CertificatePurpose :param certificate_created: Time the certificate is created. Default value is None. :type certificate_created: ~datetime.datetime :param certificate_last_updated: Time the certificate is last updated. Default value is None. :type certificate_last_updated: ~datetime.datetime :param certificate_has_private_key: Indicates if the certificate contains a private key. Default value is None. :type certificate_has_private_key: bool :param certificate_nonce: Random number generated to indicate Proof of Possession. Default value is None. :type certificate_nonce: str :keyword callable cls: A custom type or function that will be passed the direct response :return: None or the result of cls(response) :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) # type: Literal["2022-02-05"] cls = kwargs.pop("cls", None) # type: ClsType[None] request = build_delete_request( resource_group_name=resource_group_name, provisioning_service_name=provisioning_service_name, certificate_name=certificate_name, subscription_id=self._config.subscription_id, if_match=if_match, certificate_name1=certificate_name1, certificate_raw_bytes=certificate_raw_bytes, certificate_is_verified=certificate_is_verified, certificate_purpose=certificate_purpose, certificate_created=certificate_created, certificate_last_updated=certificate_last_updated, certificate_has_private_key=certificate_has_private_key, certificate_nonce=certificate_nonce, api_version=api_version, template_url=self.delete.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) # type: ignore pipeline_response = self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorDetails, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) if cls: return cls(pipeline_response, None, {})
delete.metadata = {"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates/{certificateName}"} # type: ignore
[docs] @distributed_trace def list( self, resource_group_name: str, provisioning_service_name: str, **kwargs: Any ) -> _models.CertificateListDescription: """Get all the certificates tied to the provisioning service. :param resource_group_name: Name of resource group. Required. :type resource_group_name: str :param provisioning_service_name: Name of provisioning service to retrieve certificates for. Required. :type provisioning_service_name: str :keyword callable cls: A custom type or function that will be passed the direct response :return: CertificateListDescription or the result of cls(response) :rtype: ~azure.mgmt.iothubprovisioningservices.models.CertificateListDescription :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) # type: Literal["2022-02-05"] cls = kwargs.pop("cls", None) # type: ClsType[_models.CertificateListDescription] request = build_list_request( resource_group_name=resource_group_name, provisioning_service_name=provisioning_service_name, subscription_id=self._config.subscription_id, api_version=api_version, template_url=self.list.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) # type: ignore pipeline_response = self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorDetails, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("CertificateListDescription", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
list.metadata = {"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates"} # type: ignore
[docs] @distributed_trace def generate_verification_code( self, certificate_name: str, if_match: str, resource_group_name: str, provisioning_service_name: str, certificate_name1: Optional[str] = None, certificate_raw_bytes: Optional[bytes] = None, certificate_is_verified: Optional[bool] = None, certificate_purpose: Optional[Union[str, _models.CertificatePurpose]] = None, certificate_created: Optional[datetime.datetime] = None, certificate_last_updated: Optional[datetime.datetime] = None, certificate_has_private_key: Optional[bool] = None, certificate_nonce: Optional[str] = None, **kwargs: Any ) -> _models.VerificationCodeResponse: """Generate verification code for Proof of Possession. :param certificate_name: The mandatory logical name of the certificate, that the provisioning service uses to access. Required. :type certificate_name: str :param if_match: ETag of the certificate. This is required to update an existing certificate, and ignored while creating a brand new certificate. Required. :type if_match: str :param resource_group_name: name of resource group. Required. :type resource_group_name: str :param provisioning_service_name: Name of provisioning service. Required. :type provisioning_service_name: str :param certificate_name1: Common Name for the certificate. Default value is None. :type certificate_name1: str :param certificate_raw_bytes: Raw data of certificate. Default value is None. :type certificate_raw_bytes: bytes :param certificate_is_verified: Indicates if the certificate has been verified by owner of the private key. Default value is None. :type certificate_is_verified: bool :param certificate_purpose: Description mentioning the purpose of the certificate. Known values are: "clientAuthentication" and "serverAuthentication". Default value is None. :type certificate_purpose: str or ~azure.mgmt.iothubprovisioningservices.models.CertificatePurpose :param certificate_created: Certificate creation time. Default value is None. :type certificate_created: ~datetime.datetime :param certificate_last_updated: Certificate last updated time. Default value is None. :type certificate_last_updated: ~datetime.datetime :param certificate_has_private_key: Indicates if the certificate contains private key. Default value is None. :type certificate_has_private_key: bool :param certificate_nonce: Random number generated to indicate Proof of Possession. Default value is None. :type certificate_nonce: str :keyword callable cls: A custom type or function that will be passed the direct response :return: VerificationCodeResponse or the result of cls(response) :rtype: ~azure.mgmt.iothubprovisioningservices.models.VerificationCodeResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) # type: Literal["2022-02-05"] cls = kwargs.pop("cls", None) # type: ClsType[_models.VerificationCodeResponse] request = build_generate_verification_code_request( certificate_name=certificate_name, resource_group_name=resource_group_name, provisioning_service_name=provisioning_service_name, subscription_id=self._config.subscription_id, if_match=if_match, certificate_name1=certificate_name1, certificate_raw_bytes=certificate_raw_bytes, certificate_is_verified=certificate_is_verified, certificate_purpose=certificate_purpose, certificate_created=certificate_created, certificate_last_updated=certificate_last_updated, certificate_has_private_key=certificate_has_private_key, certificate_nonce=certificate_nonce, api_version=api_version, template_url=self.generate_verification_code.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) # type: ignore pipeline_response = self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorDetails, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("VerificationCodeResponse", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
generate_verification_code.metadata = {"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates/{certificateName}/generateVerificationCode"} # type: ignore @overload def verify_certificate( self, certificate_name: str, if_match: str, resource_group_name: str, provisioning_service_name: str, request: _models.VerificationCodeRequest, certificate_name1: Optional[str] = None, certificate_raw_bytes: Optional[bytes] = None, certificate_is_verified: Optional[bool] = None, certificate_purpose: Optional[Union[str, _models.CertificatePurpose]] = None, certificate_created: Optional[datetime.datetime] = None, certificate_last_updated: Optional[datetime.datetime] = None, certificate_has_private_key: Optional[bool] = None, certificate_nonce: Optional[str] = None, *, content_type: str = "application/json", **kwargs: Any ) -> _models.CertificateResponse: """Verify certificate's private key possession. Verifies the certificate's private key possession by providing the leaf cert issued by the verifying pre uploaded certificate. :param certificate_name: The mandatory logical name of the certificate, that the provisioning service uses to access. Required. :type certificate_name: str :param if_match: ETag of the certificate. Required. :type if_match: str :param resource_group_name: Resource group name. Required. :type resource_group_name: str :param provisioning_service_name: Provisioning service name. Required. :type provisioning_service_name: str :param request: The name of the certificate. Required. :type request: ~azure.mgmt.iothubprovisioningservices.models.VerificationCodeRequest :param certificate_name1: Common Name for the certificate. Default value is None. :type certificate_name1: str :param certificate_raw_bytes: Raw data of certificate. Default value is None. :type certificate_raw_bytes: bytes :param certificate_is_verified: Indicates if the certificate has been verified by owner of the private key. Default value is None. :type certificate_is_verified: bool :param certificate_purpose: Describe the purpose of the certificate. Known values are: "clientAuthentication" and "serverAuthentication". Default value is None. :type certificate_purpose: str or ~azure.mgmt.iothubprovisioningservices.models.CertificatePurpose :param certificate_created: Certificate creation time. Default value is None. :type certificate_created: ~datetime.datetime :param certificate_last_updated: Certificate last updated time. Default value is None. :type certificate_last_updated: ~datetime.datetime :param certificate_has_private_key: Indicates if the certificate contains private key. Default value is None. :type certificate_has_private_key: bool :param certificate_nonce: Random number generated to indicate Proof of Possession. Default value is None. :type certificate_nonce: str :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: CertificateResponse or the result of cls(response) :rtype: ~azure.mgmt.iothubprovisioningservices.models.CertificateResponse :raises ~azure.core.exceptions.HttpResponseError: """ @overload def verify_certificate( self, certificate_name: str, if_match: str, resource_group_name: str, provisioning_service_name: str, request: IO, certificate_name1: Optional[str] = None, certificate_raw_bytes: Optional[bytes] = None, certificate_is_verified: Optional[bool] = None, certificate_purpose: Optional[Union[str, _models.CertificatePurpose]] = None, certificate_created: Optional[datetime.datetime] = None, certificate_last_updated: Optional[datetime.datetime] = None, certificate_has_private_key: Optional[bool] = None, certificate_nonce: Optional[str] = None, *, content_type: str = "application/json", **kwargs: Any ) -> _models.CertificateResponse: """Verify certificate's private key possession. Verifies the certificate's private key possession by providing the leaf cert issued by the verifying pre uploaded certificate. :param certificate_name: The mandatory logical name of the certificate, that the provisioning service uses to access. Required. :type certificate_name: str :param if_match: ETag of the certificate. Required. :type if_match: str :param resource_group_name: Resource group name. Required. :type resource_group_name: str :param provisioning_service_name: Provisioning service name. Required. :type provisioning_service_name: str :param request: The name of the certificate. Required. :type request: IO :param certificate_name1: Common Name for the certificate. Default value is None. :type certificate_name1: str :param certificate_raw_bytes: Raw data of certificate. Default value is None. :type certificate_raw_bytes: bytes :param certificate_is_verified: Indicates if the certificate has been verified by owner of the private key. Default value is None. :type certificate_is_verified: bool :param certificate_purpose: Describe the purpose of the certificate. Known values are: "clientAuthentication" and "serverAuthentication". Default value is None. :type certificate_purpose: str or ~azure.mgmt.iothubprovisioningservices.models.CertificatePurpose :param certificate_created: Certificate creation time. Default value is None. :type certificate_created: ~datetime.datetime :param certificate_last_updated: Certificate last updated time. Default value is None. :type certificate_last_updated: ~datetime.datetime :param certificate_has_private_key: Indicates if the certificate contains private key. Default value is None. :type certificate_has_private_key: bool :param certificate_nonce: Random number generated to indicate Proof of Possession. Default value is None. :type certificate_nonce: str :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: CertificateResponse or the result of cls(response) :rtype: ~azure.mgmt.iothubprovisioningservices.models.CertificateResponse :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def verify_certificate( self, certificate_name: str, if_match: str, resource_group_name: str, provisioning_service_name: str, request: Union[_models.VerificationCodeRequest, IO], certificate_name1: Optional[str] = None, certificate_raw_bytes: Optional[bytes] = None, certificate_is_verified: Optional[bool] = None, certificate_purpose: Optional[Union[str, _models.CertificatePurpose]] = None, certificate_created: Optional[datetime.datetime] = None, certificate_last_updated: Optional[datetime.datetime] = None, certificate_has_private_key: Optional[bool] = None, certificate_nonce: Optional[str] = None, **kwargs: Any ) -> _models.CertificateResponse: """Verify certificate's private key possession. Verifies the certificate's private key possession by providing the leaf cert issued by the verifying pre uploaded certificate. :param certificate_name: The mandatory logical name of the certificate, that the provisioning service uses to access. Required. :type certificate_name: str :param if_match: ETag of the certificate. Required. :type if_match: str :param resource_group_name: Resource group name. Required. :type resource_group_name: str :param provisioning_service_name: Provisioning service name. Required. :type provisioning_service_name: str :param request: The name of the certificate. Is either a model type or a IO type. Required. :type request: ~azure.mgmt.iothubprovisioningservices.models.VerificationCodeRequest or IO :param certificate_name1: Common Name for the certificate. Default value is None. :type certificate_name1: str :param certificate_raw_bytes: Raw data of certificate. Default value is None. :type certificate_raw_bytes: bytes :param certificate_is_verified: Indicates if the certificate has been verified by owner of the private key. Default value is None. :type certificate_is_verified: bool :param certificate_purpose: Describe the purpose of the certificate. Known values are: "clientAuthentication" and "serverAuthentication". Default value is None. :type certificate_purpose: str or ~azure.mgmt.iothubprovisioningservices.models.CertificatePurpose :param certificate_created: Certificate creation time. Default value is None. :type certificate_created: ~datetime.datetime :param certificate_last_updated: Certificate last updated time. Default value is None. :type certificate_last_updated: ~datetime.datetime :param certificate_has_private_key: Indicates if the certificate contains private key. Default value is None. :type certificate_has_private_key: bool :param certificate_nonce: Random number generated to indicate Proof of Possession. Default value is None. :type certificate_nonce: str :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :keyword callable cls: A custom type or function that will be passed the direct response :return: CertificateResponse or the result of cls(response) :rtype: ~azure.mgmt.iothubprovisioningservices.models.CertificateResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version = kwargs.pop( "api_version", _params.pop("api-version", self._config.api_version) ) # type: Literal["2022-02-05"] content_type = kwargs.pop("content_type", _headers.pop("Content-Type", None)) # type: Optional[str] cls = kwargs.pop("cls", None) # type: ClsType[_models.CertificateResponse] content_type = content_type or "application/json" _json = None _content = None if isinstance(request, (IO, bytes)): _content = request else: _json = self._serialize.body(request, "VerificationCodeRequest") request = build_verify_certificate_request( certificate_name=certificate_name, resource_group_name=resource_group_name, provisioning_service_name=provisioning_service_name, subscription_id=self._config.subscription_id, if_match=if_match, certificate_name1=certificate_name1, certificate_raw_bytes=certificate_raw_bytes, certificate_is_verified=certificate_is_verified, certificate_purpose=certificate_purpose, certificate_created=certificate_created, certificate_last_updated=certificate_last_updated, certificate_has_private_key=certificate_has_private_key, certificate_nonce=certificate_nonce, api_version=api_version, content_type=content_type, json=_json, content=_content, template_url=self.verify_certificate.metadata["url"], headers=_headers, params=_params, ) request = _convert_request(request) request.url = self._client.format_url(request.url) # type: ignore pipeline_response = self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorDetails, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("CertificateResponse", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) return deserialized
verify_certificate.metadata = {"url": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Devices/provisioningServices/{provisioningServiceName}/certificates/{certificateName}/verify"} # type: ignore