Source code for azure.ai.ml.entities._deployment.online_deployment

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

from abc import abstractmethod
import logging
from typing import Any, Dict, Union, Optional

from azure.ai.ml._restclient.v2022_02_01_preview.models import (
    CodeConfiguration as RestCodeConfiguration,
    OnlineDeploymentDetails as RestOnlineDeploymentDetails,
    OnlineDeploymentData as RestOnlineDeploymentData,
    KubernetesOnlineDeployment as RestKubernetesOnlineDeployment,
    ManagedOnlineDeployment as RestManagedOnlineDeployment,
    Sku as RestSku,
    EndpointComputeType,
)
from azure.ai.ml._utils.utils import camel_to_snake, load_yaml
from azure.ai.ml.entities import CodeConfiguration, Environment, Model
from azure.ai.ml.entities._assets import Code
from azure.ai.ml.entities._deployment.deployment_settings import ProbeSettings, OnlineRequestSettings

from azure.ai.ml.entities._deployment.scale_settings import (
    OnlineScaleSettings,
    TargetUtilizationScaleSettings,
    DefaultScaleSettings,
)
from azure.ai.ml.entities._deployment.resource_requirements_settings import ResourceRequirementsSettings
from azure.ai.ml._utils._arm_id_utils import _parse_endpoint_name_from_deployment_id
from azure.ai.ml.entities._endpoint._endpoint_helpers import validate_endpoint_or_deployment_name
from azure.ai.ml.constants import (
    BASE_PATH_CONTEXT_KEY,
    PARAMS_OVERRIDE_KEY,
    ArmConstants,
)
from os import PathLike
from pathlib import Path
from azure.ai.ml.entities._util import load_from_dict
from marshmallow.exceptions import ValidationError

from azure.ai.ml._schema._deployment.online.online_deployment import (
    KubernetesOnlineDeploymentSchema,
    ManagedOnlineDeploymentSchema,
)
from .deployment import Deployment
from azure.ai.ml._ml_exceptions import DeploymentException, ErrorCategory, ErrorTarget, ValidationException

module_logger = logging.getLogger(__name__)


[docs]class OnlineDeployment(Deployment): """Online endpoint deployment entity :param name: Name of the resource. :type name: str :param tags: Tag dictionary. Tags can be added, removed, and updated. :type tags: dict[str, str] :param properties: The asset property dictionary. :type properties: Dict[str, Any], optional :param description: Description of the resource. :type description: str, optional :param model: Model entity for the endpoint deployment, defaults to None :type model: Union[str, Model], optional :param code_configuration: defaults to None :type code_configuration: CodeConfiguration, optional :param environment: Environment entity for the endpoint deployment, defaults to None :type environment: Union[str, Environment], optional :param app_insights_enabled: defaults to False :type app_insights_enabled: bool, optional :param scale_settings: How the online deployment will scale. :type scale_settings: OnlineScaleSettings, optional :param request_settings: defaults to RequestSettings() :type request_settings: OnlineRequestSettings, optional :param liveness_probe: Liveness probe settings. :type liveness_probe: ProbeSettings, optional :param readiness_probe: Readiness probe settings. :type readiness_probe: ProbeSettings, optional :param environment_variables: Environment variables that will be set in deployment. :type environment_variables: dict, optional :param instance_count: The instance count used for this deployment. :type instance_count: int :param instance_type: Azure compute sku. :type instance_type: str :param model_mount_path: The path to mount the model in custom container.. :type model_mount_path: str :param code_path: Equivalent to code_configuration.code, will be ignored if code_configuration is present. :type code_path: Union[str, PathLike], optional :param scoring_script: Equivalent to code_configuration.code.scoring_script, will be ignored if code_configuration is present. :type scoring_script: Union[str, PathLike], optional """ def __init__( self, name: str, endpoint_name: str = None, tags: Dict[str, Any] = None, properties: Dict[str, Any] = None, description: str = None, model: Union[str, "Model"] = None, code_configuration: CodeConfiguration = None, environment: Union[str, "Environment"] = None, app_insights_enabled: bool = False, scale_settings: OnlineScaleSettings = None, request_settings: OnlineRequestSettings = None, liveness_probe: ProbeSettings = None, readiness_probe: ProbeSettings = None, environment_variables: Dict[str, str] = None, instance_count: int = None, instance_type: str = None, model_mount_path: str = None, code_path: Union[str, PathLike] = None, # promoted property from code_configuration.code scoring_script: Union[str, PathLike] = None, # promoted property from code_configuration.scoring_script **kwargs, ): self._provisioning_state = kwargs.pop("provisioning_state", None) super(OnlineDeployment, self).__init__( name=name, endpoint_name=endpoint_name, tags=tags, properties=properties, description=description, model=model, code_configuration=code_configuration, environment=environment, environment_variables=environment_variables, code_path=code_path, scoring_script=scoring_script, **kwargs, ) self.app_insights_enabled = app_insights_enabled self.scale_settings = scale_settings self.request_settings = request_settings self.liveness_probe = liveness_probe self.readiness_probe = readiness_probe self.instance_count = instance_count self._arm_type = ArmConstants.ONLINE_DEPLOYMENT_TYPE self.model_mount_path = model_mount_path self.instance_type = instance_type @property def provisioning_state(self) -> Optional[str]: """Deployment provisioning state, readonly :return: Deployment provisioning state. :rtype: Optional[str] """ return self._provisioning_state def _generate_dependencies(self) -> Any: """Convert dependencies into ARM id or REST wrapper""" code = None if self.code_configuration: self.code_configuration._validate() code_id = ( self.code_configuration.code if isinstance(self.code_configuration.code, str) else self.code_configuration.code.id ) code = RestCodeConfiguration(code_id=code_id, scoring_script=self.code_configuration.scoring_script) model_id = None if self.model: model_id = self.model if isinstance(self.model, str) else self.model.id environment_id = None if self.environment: environment_id = self.environment if isinstance(self.environment, str) else self.environment.id return code, environment_id, model_id def _to_dict(self) -> Dict: pass @abstractmethod def _to_rest_object(self) -> RestOnlineDeploymentData: pass @classmethod def _from_rest_object(self, deployment: RestOnlineDeploymentData) -> RestOnlineDeploymentDetails: if deployment.properties.endpoint_compute_type == EndpointComputeType.KUBERNETES: return KubernetesOnlineDeployment._from_rest_object(deployment) elif deployment.properties.endpoint_compute_type == EndpointComputeType.MANAGED: return ManagedOnlineDeployment._from_rest_object(deployment) else: msg = f"Unsupported online endpoint type {deployment.properties.type}." raise DeploymentException( message=msg, target=ErrorTarget.ONLINE_DEPLOYMENT, no_personal_data_message=msg, error_category=ErrorCategory.SYSTEM_ERROR, ) def _get_arm_resource(self, **kwargs): resource = super(OnlineDeployment, self)._get_arm_resource(**kwargs) depends_on = [] if self.environment and isinstance(self.environment, Environment): depends_on.append(f"{self.environment._arm_type}Deployment") if self.code_configuration and self.code_configuration.code and isinstance(self.code_configuration.code, Code): depends_on.append(f"{self.code_configuration.code._arm_type}Deployment") if self.model and isinstance(self.model, Model): depends_on.append(f"{self.model._arm_type}Deployment") resource[ArmConstants.DEPENDSON_PARAMETER_NAME] = depends_on return resource def _get_arm_resource_and_params(self, **kwargs): resource_param_tuple_list = [(self._get_arm_resource(**kwargs), self._to_arm_resource_param(**kwargs))] if self.environment and isinstance(self.environment, Environment): resource_param_tuple_list.extend(self.environment._get_arm_resource_and_params()) if self.code_configuration and self.code_configuration.code and isinstance(self.code_configuration.code, Code): resource_param_tuple_list.extend(self.code_configuration.code._get_arm_resource_and_params()) if self.model and isinstance(self.model, Model): resource_param_tuple_list.extend(self.model._get_arm_resource_and_params()) return resource_param_tuple_list def _validate_name(self) -> None: if self.name: validate_endpoint_or_deployment_name(self.name, is_deployment=True) def _merge_with(self, other: "OnlineDeployment") -> None: if other: if self.name != other.name: msg = "The deployment name: {} and {} are not matched when merging." raise ValidationException( message=msg.format(self.name, other.name), target=ErrorTarget.ONLINE_DEPLOYMENT, no_personal_data_message=msg.format("[name1]", "[name2]"), error_category=ErrorCategory.USER_ERROR, ) super()._merge_with(other) self.app_insights_enabled = other.app_insights_enabled or self.app_insights_enabled # Adding noqa: Fix E721 do not compare types, use 'isinstance()' # isinstance will include checking for subclasses, which is explicitly undesired by a logic. if self.scale_settings and type(self.scale_settings) == type(other.scale_settings): # noqa self.scale_settings._merge_with(other.scale_settings) else: self.scale_settings = other.scale_settings if self.request_settings: self.request_settings._merge_with(other.request_settings) else: self.request_settings = other.request_settings if self.liveness_probe: self.liveness_probe._merge_with(other.liveness_probe) else: self.liveness_probe = other.liveness_probe if self.readiness_probe: self.readiness_probe._merge_with(other.readiness_probe) else: self.readiness_probe = other.readiness_probe self.instance_count = other.instance_count or self.instance_count self.instance_type = other.instance_type or self.instance_type @classmethod def _load( cls, data: dict, yaml_path: Union[PathLike, str] = None, params_override: list = None, **kwargs, ) -> "OnlineDeployment": params_override = params_override or [] context = { BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path.cwd(), PARAMS_OVERRIDE_KEY: params_override, } deployment_type = data.get("type", None) if deployment_type == camel_to_snake(EndpointComputeType.KUBERNETES.value): return load_from_dict(KubernetesOnlineDeploymentSchema, data, context, **kwargs) return load_from_dict(ManagedOnlineDeploymentSchema, data, context, **kwargs)
[docs]class KubernetesOnlineDeployment(OnlineDeployment): """Kubernetes Online endpoint deployment entity :param name: Name of the resource. :type name: str :param tags: Tag dictionary. Tags can be added, removed, and updated. :type tags: dict[str, str] :param properties: The asset property dictionary. :type properties: Dict[str, Any], optional :param description: Description of the resource. :type description: str, optional :param model: Model entity for the endpoint deployment, defaults to None :type model: Union[str, Model], optional :param code_configuration: defaults to None :type code_configuration: CodeConfiguration, optional :param environment: Environment entity for the endpoint deployment, defaults to None :type environment: Union[str, Environment], optional :param app_insights_enabled: defaults to False :type app_insights_enabled: bool, optional :param scale_settings: How the online deployment will scale. :type scale_settings: OnlineScaleSettings, optional :param request_settings: defaults to RequestSettings() :type request_settings: OnlineRequestSettings, optional :param liveness_probe: Liveness probe settings. :type liveness_probe: ProbeSettings, optional :param readiness_probe: Readiness probe settings. :type readiness_probe: ProbeSettings, optional :param environment_variables: Environment variables that will be set in deployment. :type environment_variables: dict, optional :param resources: defaults to None :type resources: ResourceRequirementsSettings, optional :param instance_type: The instance type defined by K8S cluster admin. :type instance_type: str :param instance_count: The instance count used for this deployment. :type instance_count: int :param code_path: Folder path to local code assets. Equivalent to code_configuration.code. :type code_path: Union[str, PathLike], optional :param scoring_script: Scoring script name. Equivalent to code_configuration.code.scoring_script. :type scoring_script: Union[str, PathLike], optional """ def __init__( self, *, name: str, endpoint_name: str = None, tags: Dict[str, Any] = None, properties: Dict[str, Any] = None, description: str = None, model: Union[str, "Model"] = None, code_configuration: CodeConfiguration = None, environment: Union[str, "Environment"] = None, app_insights_enabled: bool = False, scale_settings: OnlineScaleSettings = None, request_settings: OnlineRequestSettings = None, liveness_probe: ProbeSettings = None, readiness_probe: ProbeSettings = None, environment_variables: Dict[str, str] = None, resources: ResourceRequirementsSettings = None, instance_count: int = None, instance_type: str = None, code_path: Union[str, PathLike] = None, # promoted property from code_configuration.code scoring_script: Union[str, PathLike] = None, # promoted property from code_configuration.scoring_script **kwargs, ): kwargs["type"] = EndpointComputeType.KUBERNETES.value super(KubernetesOnlineDeployment, self).__init__( name=name, endpoint_name=endpoint_name, tags=tags, properties=properties, description=description, model=model, code_configuration=code_configuration, environment=environment, environment_variables=environment_variables, instance_count=instance_count, instance_type=instance_type, app_insights_enabled=app_insights_enabled, scale_settings=scale_settings, request_settings=request_settings, liveness_probe=liveness_probe, readiness_probe=readiness_probe, code_path=code_path, scoring_script=scoring_script, **kwargs, ) self.resources = resources def _to_dict(self) -> Dict: return KubernetesOnlineDeploymentSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self) def _to_rest_object(self, location: str) -> RestOnlineDeploymentData: self._validate() code, environment, model = self._generate_dependencies() properties = RestKubernetesOnlineDeployment( code_configuration=code, environment_id=environment, model=model, model_mount_path=self.model_mount_path, scale_settings=self.scale_settings._to_rest_object() if self.scale_settings else None, properties=self.properties, description=self.description, environment_variables=self.environment_variables, app_insights_enabled=self.app_insights_enabled, request_settings=self.request_settings._to_rest_object() if self.request_settings else None, liveness_probe=self.liveness_probe._to_rest_object() if self.liveness_probe else None, readiness_probe=self.readiness_probe._to_rest_object() if self.readiness_probe else None, container_resource_requirements=self.resources._to_rest_object() if self.resources else None, instance_type=self.instance_type if self.instance_type else None, ) sku = RestSku(name="Default", capacity=self.instance_count) return RestOnlineDeploymentData(location=location, properties=properties, tags=self.tags, sku=sku) def _to_arm_resource_param(self, **kwargs): from azure.ai.ml.constants import ArmConstants rest_object = self._to_rest_object(**kwargs) properties = rest_object.properties sku = rest_object.sku tags = rest_object.tags return { self._arm_type: { ArmConstants.NAME: self.name, ArmConstants.PROPERTIES_PARAMETER_NAME: self._serialize.body(properties, "K8SOnlineDeployment"), ArmConstants.SKU: self._serialize.body(sku, "Sku"), ArmConstants.TAGS: tags, } } def _merge_with(self, other: "KubernetesOnlineDeployment") -> None: if other: super()._merge_with(other) if self.resources: self.resources._merge_with(other.resources) else: self.resources = other.resources def _validate(self) -> None: self._validate_name() @classmethod def _from_rest_object(self, resource: RestOnlineDeploymentData) -> "KubernetesOnlineDeployment": deployment = resource.properties code_config = ( CodeConfiguration( code=deployment.code_configuration.code_id, scoring_script=deployment.code_configuration.scoring_script, ) if deployment.code_configuration else None ) entity = KubernetesOnlineDeployment( id=resource.id, name=resource.name, tags=resource.tags, properties=deployment.properties, description=deployment.description, request_settings=OnlineRequestSettings._from_rest_object(deployment.request_settings), model=deployment.model, code_configuration=code_config, environment=deployment.environment_id, resources=ResourceRequirementsSettings._from_rest_object(deployment.container_resource_requirements), app_insights_enabled=deployment.app_insights_enabled, scale_settings=OnlineScaleSettings._from_rest_object(deployment.scale_settings), liveness_probe=ProbeSettings._from_rest_object(deployment.liveness_probe), readiness_probe=ProbeSettings._from_rest_object(deployment.readiness_probe), environment_variables=deployment.environment_variables, endpoint_name=_parse_endpoint_name_from_deployment_id(resource.id), instance_count=resource.sku.capacity if resource.sku else None, instance_type=deployment.instance_type, ) entity._provisioning_state = deployment.provisioning_state return entity
[docs]class ManagedOnlineDeployment(OnlineDeployment): """Managed Online endpoint deployment entity :param name: Name of the resource. :type name: str :param tags: Tag dictionary. Tags can be added, removed, and updated. :type tags: dict[str, str] :param properties: The asset property dictionary. :type properties: Dict[str, Any], optional :param description: Description of the resource. :type description: str, optional :param model: Model entity for the endpoint deployment, defaults to None :type model: Union[str, Model], optional :param code_configuration: defaults to None :type code_configuration: CodeConfiguration, optional :param environment: Environment entity for the endpoint deployment, defaults to None :type environment: Union[str, Environment], optional :param app_insights_enabled: defaults to False :type app_insights_enabled: bool, optional :param scale_settings: How the online deployment will scale. :type scale_settings: OnlineScaleSettings, optional :param request_settings: defaults to RequestSettings() :type request_settings: OnlineRequestSettings, optional :param liveness_probe: Liveness probe settings. :type liveness_probe: ProbeSettings, optional :param readiness_probe: Readiness probe settings. :type readiness_probe: ProbeSettings, optional :param environment_variables: Environment variables that will be set in deployment. :type environment_variables: dict, optional :param instance_type: Azure compute sku. :type instance_type: str :param instance_count: The instance count used for this deployment. :type instance_count: int :param code_path: Folder path to local code assets. Equivalent to code_configuration.code. :type code_path: Union[str, PathLike], optional :param scoring_script: Scoring script name. Equivalent to code_configuration.code.scoring_script. :type scoring_script: Union[str, PathLike], optional """ def __init__( self, *, name: str, endpoint_name: str = None, tags: Dict[str, Any] = None, properties: Dict[str, Any] = None, description: str = None, model: Union[str, "Model"] = None, code_configuration: CodeConfiguration = None, environment: Union[str, "Environment"] = None, app_insights_enabled: bool = False, scale_settings: OnlineScaleSettings = None, request_settings: OnlineRequestSettings = None, liveness_probe: ProbeSettings = None, readiness_probe: ProbeSettings = None, environment_variables: Dict[str, str] = None, instance_type: str = None, instance_count: int = None, code_path: Union[str, PathLike] = None, # promoted property from code_configuration.code scoring_script: Union[str, PathLike] = None, # promoted property from code_configuration.scoring_script **kwargs, ): kwargs["type"] = EndpointComputeType.MANAGED.value self.private_network_connection = kwargs.pop("private_network_connection", None) self.egress_public_network_access = kwargs.pop("egress_public_network_access", None) super(ManagedOnlineDeployment, self).__init__( name=name, endpoint_name=endpoint_name, tags=tags, properties=properties, description=description, model=model, code_configuration=code_configuration, environment=environment, environment_variables=environment_variables, app_insights_enabled=app_insights_enabled, scale_settings=scale_settings, request_settings=request_settings, liveness_probe=liveness_probe, readiness_probe=readiness_probe, instance_count=instance_count, instance_type=instance_type, code_path=code_path, scoring_script=scoring_script, **kwargs, ) self.readiness_probe = readiness_probe def _to_dict(self) -> Dict: return ManagedOnlineDeploymentSchema(context={BASE_PATH_CONTEXT_KEY: "./"}).dump(self) def _to_rest_object(self, location: str) -> RestOnlineDeploymentData: self._validate() code, environment, model = self._generate_dependencies() properties = RestManagedOnlineDeployment( code_configuration=code, environment_id=environment, model=model, model_mount_path=self.model_mount_path, scale_settings=self.scale_settings._to_rest_object() if self.scale_settings else None, properties=self.properties, description=self.description, environment_variables=self.environment_variables, app_insights_enabled=self.app_insights_enabled, request_settings=self.request_settings._to_rest_object() if self.request_settings else None, liveness_probe=self.liveness_probe._to_rest_object() if self.liveness_probe else None, instance_type=self.instance_type, readiness_probe=self.readiness_probe._to_rest_object() if self.readiness_probe else None, ) # TODO: SKU name is defaulted to value "Default" since service side requires it. # Should be removed once service side defaults it. sku = RestSku(name="Default", capacity=self.instance_count) # mfe is expecting private network connection to be in both the attribute level # as well as in the properties dictionary. if hasattr(self, "private_network_connection") and self.private_network_connection: properties.private_network_connection = self.private_network_connection properties.properties["private-network-connection"] = self.private_network_connection if hasattr(self, "egress_public_network_access") and self.egress_public_network_access: properties.egress_public_network_access = self.egress_public_network_access return RestOnlineDeploymentData(location=location, properties=properties, tags=self.tags, sku=sku) def _to_arm_resource_param(self, **kwargs): from azure.ai.ml.constants import ArmConstants rest_object = self._to_rest_object(**kwargs) properties = rest_object.properties sku = rest_object.sku tags = rest_object.tags return { self._arm_type: { ArmConstants.NAME: self.name, ArmConstants.PROPERTIES_PARAMETER_NAME: self._serialize.body(properties, "ManagedOnlineDeployment"), ArmConstants.SKU: self._serialize.body(sku, "Sku"), ArmConstants.TAGS: tags, } } @classmethod def _from_rest_object(self, resource: RestOnlineDeploymentData) -> "ManagedOnlineDeployment": deployment = resource.properties code_config = ( CodeConfiguration( code=deployment.code_configuration.code_id, scoring_script=deployment.code_configuration.scoring_script, ) if deployment.code_configuration else None ) entity = ManagedOnlineDeployment( id=resource.id, name=resource.name, tags=resource.tags, properties=deployment.properties, description=deployment.description, request_settings=OnlineRequestSettings._from_rest_object(deployment.request_settings), model=(deployment.model if deployment.model else None), code_configuration=code_config, environment=deployment.environment_id, app_insights_enabled=deployment.app_insights_enabled, scale_settings=OnlineScaleSettings._from_rest_object(deployment.scale_settings), liveness_probe=ProbeSettings._from_rest_object(deployment.liveness_probe), environment_variables=deployment.environment_variables, readiness_probe=ProbeSettings._from_rest_object(deployment.readiness_probe), instance_type=deployment.instance_type, endpoint_name=_parse_endpoint_name_from_deployment_id(resource.id), instance_count=resource.sku.capacity, private_network_connection=deployment.private_network_connection, egress_public_network_access=deployment.egress_public_network_access, ) entity._provisioning_state = deployment.provisioning_state return entity def _merge_with(self, other: "ManagedOnlineDeployment") -> None: if other: super()._merge_with(other) self.instance_type = other.instance_type or self.instance_type def _validate(self) -> None: self._validate_name() self._validate_scale_settings() def _validate_scale_settings(self) -> None: if self.scale_settings: if not isinstance(self.scale_settings, DefaultScaleSettings): msg = "ManagedOnlineEndpoint supports DefaultScaleSettings only." raise ValidationException( message=msg, target=ErrorTarget.ONLINE_DEPLOYMENT, no_personal_data_message=msg, error_category=ErrorCategory.USER_ERROR, )