Source code for azure.search.documents.aio._paging

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from typing import List, Dict, Optional, cast

from azure.core.paging import ReturnType
from azure.core.async_paging import AsyncItemPaged, AsyncPageIterator
from .._generated.models import QueryAnswerResult, SearchDocumentsResult
from .._paging import (
    convert_search_result,
    pack_continuation_token,
    unpack_continuation_token,
)
from .._api_versions import DEFAULT_VERSION


[docs]class AsyncSearchItemPaged(AsyncItemPaged[ReturnType]): def __init__(self, *args, **kwargs) -> None: super(AsyncSearchItemPaged, self).__init__(*args, **kwargs) self._first_page_iterator_instance: Optional[AsyncSearchPageIterator] = None async def __anext__(self) -> ReturnType: if self._page_iterator is None: self._page_iterator = self.by_page() self._first_page_iterator_instance = cast(AsyncSearchPageIterator, self._page_iterator) return await self.__anext__() if self._page is None: # Let it raise StopAsyncIteration self._page = await self._page_iterator.__anext__() return await self.__anext__() try: return await self._page.__anext__() except StopAsyncIteration: self._page = None return await self.__anext__() def _first_iterator_instance(self) -> "AsyncSearchPageIterator": if self._first_page_iterator_instance is None: self._page_iterator = cast(AsyncSearchPageIterator, self.by_page()) self._first_page_iterator_instance = self._page_iterator return self._first_page_iterator_instance
[docs] async def get_facets(self) -> Optional[Dict]: """Return any facet results if faceting was requested. :return: Facet results. :rtype: dict """ return cast(Dict, await self._first_iterator_instance().get_facets())
[docs] async def get_coverage(self) -> float: """Return the coverage percentage, if `minimum_coverage` was specificied for the query. :return: Coverage percentage. :rtype: float """ return cast(float, await self._first_iterator_instance().get_coverage())
[docs] async def get_count(self) -> int: """Return the count of results if `include_total_count` was set for the query. :return: Count of results. :rtype: int """ return cast(int, await self._first_iterator_instance().get_count())
[docs] async def get_answers(self) -> Optional[List[QueryAnswerResult]]: """Return semantic answers. Only included if the semantic ranker is used and answers are requested in the search query via the query_answer parameter. :return: Answers. :rtype: list[~azure.search.documents.QueryAnswerResult] """ return cast(List[QueryAnswerResult], await self._first_iterator_instance().get_answers())
# The pylint error silenced below seems spurious, as the inner wrapper does, in # fact, become a method of the class when it is applied. def _ensure_response(f): # pylint:disable=protected-access async def wrapper(self, *args, **kw): if self._current_page is None: self._response = await self._get_next(self.continuation_token) self.continuation_token, self._current_page = await self._extract_data(self._response) return await f(self, *args, **kw) return wrapper class AsyncSearchPageIterator(AsyncPageIterator[ReturnType]): def __init__(self, client, initial_query, kwargs, continuation_token=None) -> None: super(AsyncSearchPageIterator, self).__init__( get_next=self._get_next_cb, extract_data=self._extract_data_cb, continuation_token=continuation_token, ) self._client = client self._initial_query = initial_query self._kwargs = kwargs self._facets = None self._api_version = kwargs.pop("api_version", DEFAULT_VERSION) async def _get_next_cb(self, continuation_token): if continuation_token is None: return await self._client.documents.search_post(search_request=self._initial_query.request, **self._kwargs) _next_link, next_page_request = unpack_continuation_token(continuation_token) return await self._client.documents.search_post(search_request=next_page_request, **self._kwargs) async def _extract_data_cb(self, response): continuation_token = pack_continuation_token(response, api_version=self._api_version) results = [convert_search_result(r) for r in response.results] return continuation_token, results @_ensure_response async def get_facets(self) -> Optional[Dict]: self.continuation_token = None response = cast(SearchDocumentsResult, self._response) facets = response.facets if facets is not None and self._facets is None: assert facets.items() is not None # Hint for mypy self._facets = {k: [x.as_dict() for x in v] for k, v in facets.items()} return self._facets @_ensure_response async def get_coverage(self) -> float: self.continuation_token = None response = cast(SearchDocumentsResult, self._response) return cast(float, response.coverage) @_ensure_response async def get_count(self) -> int: self.continuation_token = None response = cast(SearchDocumentsResult, self._response) return cast(int, response.count) @_ensure_response async def get_answers(self) -> Optional[List[QueryAnswerResult]]: self.continuation_token = None response = cast(SearchDocumentsResult, self._response) return response.answers