# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import datetime
import base64
import functools
import json
from typing import Optional, Any
from azure.core.exceptions import HttpResponseError
from azure.core.polling import AsyncLROPoller
from azure.core.polling.base_polling import OperationFailed, BadStatus
from azure.core.polling.async_base_polling import AsyncLROBasePolling
from azure.core.polling._async_poller import PollingReturnType
from .._lro import TextAnalyticsOperationResourcePolling
from .._generated.v2022_04_01_preview.models import JobMetadata
_FINISHED = frozenset(["succeeded", "cancelled", "failed", "partiallycompleted", "partiallysucceeded"])
_FAILED = frozenset(["failed"])
_SUCCEEDED = frozenset(["succeeded", "partiallycompleted", "partiallysucceeded"])
class TextAnalyticsAsyncLROPollingMethod(AsyncLROBasePolling):
def finished(self):
"""Is this polling finished?
:rtype: bool
"""
return TextAnalyticsAsyncLROPollingMethod._finished(self.status())
@staticmethod
def _finished(status):
if hasattr(status, "value"):
status = status.value
return str(status).lower() in _FINISHED
@staticmethod
def _failed(status):
if hasattr(status, "value"):
status = status.value
return str(status).lower() in _FAILED
@staticmethod
def _raise_if_bad_http_status_and_method(response):
"""Check response status code is valid.
Must be 200, 201, 202, or 204.
:raises: BadStatus if invalid status.
"""
code = response.status_code
if code in {200, 201, 202, 204}:
return
raise BadStatus(
"Invalid return status {!r} for {!r} operation".format(
code, response.request.method
)
)
async def _poll(self): # pylint:disable=invalid-overridden-method
"""Poll status of operation so long as operation is incomplete and
we have an endpoint to query.
:param callable update_cmd: The function to call to retrieve the
latest status of the long running operation.
:raises: OperationFailed if operation status 'Failed' or 'Canceled'.
:raises: BadStatus if response status invalid.
:raises: BadResponse if response invalid.
"""
while not self.finished():
await self._delay()
await self.update_status()
if TextAnalyticsAsyncLROPollingMethod._failed(self.status()):
try:
job = json.loads(self._pipeline_response.http_response.text())
error_message = ""
for err in job["errors"]:
error_message += "({}) {}".format(err["code"], err["message"])
raise HttpResponseError(message=error_message, response=self._pipeline_response.http_response)
except KeyError as e:
raise OperationFailed("Operation failed or canceled") from e
final_get_url = self._operation.get_final_get_url(self._pipeline_response)
if final_get_url:
self._pipeline_response = await self.request_status(final_get_url)
TextAnalyticsAsyncLROPollingMethod._raise_if_bad_http_status_and_method(
self._pipeline_response.http_response
)
class AsyncAnalyzeHealthcareEntitiesLROPollingMethod(
TextAnalyticsAsyncLROPollingMethod
):
def __init__(self, *args, **kwargs):
self._text_analytics_client = kwargs.pop("text_analytics_client")
self._doc_id_order = kwargs.pop("doc_id_order", None)
self._show_stats = kwargs.pop("show_stats", None)
super().__init__(
*args, **kwargs
)
@property
def _current_body(self):
return JobMetadata.deserialize(self._pipeline_response)
@property
def created_on(self):
if not self._current_body:
return None
return self._current_body.created_date_time
@property
def expires_on(self):
if not self._current_body:
return None
return self._current_body.expiration_date_time
@property
def last_modified_on(self):
if not self._current_body:
return None
return self._current_body.last_update_date_time
@property
def id(self):
if not self._current_body:
return None
return self._current_body.job_id
@property
def display_name(self):
if not self._current_body:
return None
return self._current_body.display_name
def get_continuation_token(self):
# type() -> str
import pickle
self._initial_response.context.options["doc_id_order"] = self._doc_id_order
self._initial_response.context.options["show_stats"] = self._show_stats
return base64.b64encode(pickle.dumps(self._initial_response)).decode('ascii')
[docs]class AsyncAnalyzeHealthcareEntitiesLROPoller(AsyncLROPoller[PollingReturnType]):
[docs] def polling_method(self) -> AsyncAnalyzeHealthcareEntitiesLROPollingMethod: # type: ignore
"""Return the polling method associated to this poller."""
return self._polling_method # type: ignore
@property
def created_on(self) -> datetime.datetime:
"""When your healthcare entities job was created
:return: When your healthcare entities job was created
:rtype: ~datetime.datetime
"""
return self.polling_method().created_on
@property
def expires_on(self) -> datetime.datetime:
"""When your healthcare entities job will expire
:return: When your healthcare entities job will expire
:rtype: ~datetime.datetime
"""
return self.polling_method().expires_on
@property
def last_modified_on(self) -> datetime.datetime:
"""When your healthcare entities job was last modified
:return: When your healthcare entities job was last modified
:rtype: ~datetime.datetime
"""
return self.polling_method().last_modified_on
@property
def id(self) -> str:
"""ID of your call to :func:`begin_analyze_healthcare_entities`
:return: ID of your call to :func:`begin_analyze_healthcare_entities`
:rtype: str
"""
return self.polling_method().id
@property
def display_name(self) -> str:
"""Given display_name to the healthcare entities job
:return: Display name of the healthcare entities job.
:rtype: str
.. versionadded:: 2022-04-01-preview
*display_name* property.
"""
return self.polling_method().display_name
[docs] @classmethod
def from_continuation_token( # type: ignore
cls,
polling_method: AsyncAnalyzeHealthcareEntitiesLROPollingMethod,
continuation_token: str,
**kwargs: Any
) -> "AsyncAnalyzeHealthcareEntitiesLROPoller":
client, initial_response, deserialization_callback = polling_method.from_continuation_token(
continuation_token, **kwargs
)
polling_method._lro_algorithms = [ # pylint: disable=protected-access
TextAnalyticsOperationResourcePolling(
show_stats=initial_response.context.options["show_stats"]
)
]
return cls(
client,
initial_response,
functools.partial(deserialization_callback, initial_response),
polling_method # type: ignore
)
[docs] async def cancel(self, **kwargs) -> "AsyncLROPoller[None]": # type: ignore
"""Cancel the operation currently being polled.
:keyword int polling_interval: The polling interval to use to poll the cancellation status.
The default value is 5 seconds.
:return: Returns an instance of an AsyncLROPoller that returns None.
:rtype: ~azure.core.polling.AsyncLROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError: When the operation has already reached a terminal state.
.. admonition:: Example:
.. literalinclude:: ../samples/async_samples/sample_analyze_healthcare_entities_with_cancellation_async.py
:start-after: [START analyze_healthcare_entities_with_cancellation_async]
:end-before: [END analyze_healthcare_entities_with_cancellation_async]
:language: python
:dedent: 4
:caption: Cancel an existing health operation.
"""
polling_interval = kwargs.pop("polling_interval", 5)
await self.polling_method().update_status()
try:
client = getattr(
self._polling_method, "_text_analytics_client"
)
try:
return await client.begin_cancel_health_job(
self.id, polling=TextAnalyticsAsyncLROPollingMethod(timeout=polling_interval)
)
except ValueError: # language API compat
return await client.begin_analyze_text_cancel_job(
self.id, polling=TextAnalyticsAsyncLROPollingMethod(timeout=polling_interval)
)
except HttpResponseError as error:
from .._response_handlers import process_http_response_error
process_http_response_error(error)
class AsyncAnalyzeActionsLROPollingMethod(TextAnalyticsAsyncLROPollingMethod):
def __init__(self, *args, **kwargs):
self._doc_id_order = kwargs.pop("doc_id_order", None)
self._task_id_order = kwargs.pop("task_id_order", None)
self._show_stats = kwargs.pop("show_stats", None)
super().__init__(*args, **kwargs)
@property
def _current_body(self):
return JobMetadata.deserialize(self._pipeline_response)
@property
def created_on(self):
if not self._current_body:
return None
return self._current_body.created_date_time
@property
def display_name(self):
if not self._current_body:
return None
return self._current_body.display_name
@property
def expires_on(self):
if not self._current_body:
return None
return self._current_body.expiration_date_time
@property
def actions_failed_count(self):
if not self._current_body:
return None
return self._current_body.additional_properties["tasks"]["failed"]
@property
def actions_in_progress_count(self):
if not self._current_body:
return None
return self._current_body.additional_properties["tasks"]["inProgress"]
@property
def actions_succeeded_count(self):
if not self._current_body:
return None
return self._current_body.additional_properties["tasks"]["completed"]
@property
def last_modified_on(self):
if not self._current_body:
return None
return self._current_body.last_update_date_time
@property
def total_actions_count(self):
if not self._current_body:
return None
return self._current_body.additional_properties["tasks"]["total"]
@property
def id(self):
if not self._current_body:
return None
return self._current_body.job_id
def get_continuation_token(self):
# type: () -> str
import pickle
self._initial_response.context.options["doc_id_order"] = self._doc_id_order
self._initial_response.context.options["task_id_order"] = self._task_id_order
self._initial_response.context.options["show_stats"] = self._show_stats
return base64.b64encode(pickle.dumps(self._initial_response)).decode('ascii')
[docs]class AsyncAnalyzeActionsLROPoller(AsyncLROPoller[PollingReturnType]):
[docs] def polling_method(self) -> AsyncAnalyzeActionsLROPollingMethod: # type: ignore
"""Return the polling method associated to this poller."""
return self._polling_method # type: ignore
@property
def created_on(self) -> datetime.datetime:
"""When your analyze job was created
:return: When your analyze job was created
:rtype: ~datetime.datetime
"""
return self.polling_method().created_on
@property
def display_name(self) -> Optional[str]:
"""The display name of your :func:`begin_analyze_actions` call.
Corresponds to the `display_name` kwarg you pass to your
:func:`begin_analyze_actions` call.
:return: The display name of your :func:`begin_analyze_actions` call.
:rtype: str
"""
return self.polling_method().display_name
@property
def expires_on(self) -> datetime.datetime:
"""When your analyze job will expire
:return: When your analyze job will expire
:rtype: ~datetime.datetime
"""
return self.polling_method().expires_on
@property
def actions_failed_count(self) -> int:
"""Total number of actions that have failed
:return: Total number of actions that have failed
:rtype: int
"""
return self.polling_method().actions_failed_count
@property
def actions_in_progress_count(self) -> int:
"""Total number of actions currently in progress
:return: Total number of actions currently in progress
:rtype: int
"""
return self.polling_method().actions_in_progress_count
@property
def actions_succeeded_count(self) -> int:
"""Total number of actions that succeeded
:return: Total number of actions that succeeded
:rtype: int
"""
return self.polling_method().actions_succeeded_count
@property
def last_modified_on(self) -> datetime.datetime:
"""The last time your actions results were updated
:return: The last time your actions results were updated
:rtype: ~datetime.datetime
"""
return self.polling_method().last_modified_on
@property
def total_actions_count(self) -> int:
"""Total number of actions you submitted
:return: Total number of actions submitted
:rtype: int
"""
return self.polling_method().total_actions_count
@property
def id(self) -> str:
"""ID of your :func:`begin_analyze_actions` call.
:return: ID of your :func:`begin_analyze_actions` call.
:rtype: str
"""
return self.polling_method().id
[docs] @classmethod
def from_continuation_token( # type: ignore
cls,
polling_method: AsyncAnalyzeActionsLROPollingMethod,
continuation_token: str,
**kwargs: Any
) -> "AsyncAnalyzeActionsLROPoller": # type: ignore
client, initial_response, deserialization_callback = polling_method.from_continuation_token(
continuation_token, **kwargs
)
polling_method._lro_algorithms = [ # pylint: disable=protected-access
TextAnalyticsOperationResourcePolling(
show_stats=initial_response.context.options["show_stats"]
)
]
return cls(
client,
initial_response,
functools.partial(deserialization_callback, initial_response),
polling_method # type: ignore
)