Source code for azure.core.pipeline.transport._base_async

# --------------------------------------------------------------------------
#
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the ""Software""), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------

import asyncio
import abc
from collections.abc import AsyncIterator

from typing import AsyncIterator as AsyncIteratorType, Generic, TypeVar
from ._base import (
    _HttpResponseBase,
    _HttpClientTransportResponse,
    PipelineContext,
    PipelineRequest,
    PipelineResponse,
)
from .._base_async import _await_result

try:
    from contextlib import AbstractAsyncContextManager  # type: ignore
except ImportError:  # Python <= 3.7

    class AbstractAsyncContextManager(object):  # type: ignore
        async def __aenter__(self):
            """Return `self` upon entering the runtime context."""
            return self

        @abc.abstractmethod
        async def __aexit__(self, exc_type, exc_value, traceback):
            """Raise any exception triggered within the runtime context."""
            return None


AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType")
HTTPResponseType = TypeVar("HTTPResponseType")
HTTPRequestType = TypeVar("HTTPRequestType")


class _ResponseStopIteration(Exception):
    pass


def _iterate_response_content(iterator):
    """"To avoid:
    TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
    """
    try:
        return next(iterator)
    except StopIteration:
        raise _ResponseStopIteration()


class _PartGenerator(AsyncIterator):
    """Until parts is a real async iterator, wrap the sync call.

    :param parts: An iterable of parts
    """

    def __init__(self, response: "AsyncHttpResponse") -> None:
        self._response = response
        self._parts = None

    async def _parse_response(self):
        responses = self._response._get_raw_parts(  # pylint: disable=protected-access
            http_response_type=AsyncHttpClientTransportResponse
        )
        if self._response.request.multipart_mixed_info:
            policies = self._response.request.multipart_mixed_info[
                1
            ]  # type: List[SansIOHTTPPolicy]

            async def parse_responses(response):
                http_request = response.request
                context = PipelineContext(None)
                pipeline_request = PipelineRequest(http_request, context)
                pipeline_response = PipelineResponse(
                    http_request, response, context=context
                )

                for policy in policies:
                    await _await_result(
                        policy.on_response, pipeline_request, pipeline_response
                    )

            # Not happy to make this code asyncio specific, but that's multipart only for now
            # If we need trio and multipart, let's reinvesitgate that later
            await asyncio.gather(*[parse_responses(res) for res in responses])

        return responses

    async def __anext__(self):
        if not self._parts:
            self._parts = iter(await self._parse_response())

        try:
            return next(self._parts)
        except StopIteration:
            raise StopAsyncIteration()


[docs]class AsyncHttpResponse(_HttpResponseBase): # pylint: disable=abstract-method """An AsyncHttpResponse ABC. Allows for the asynchronous streaming of data from the response. """
[docs] def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: """Generator for streaming response body data. Should be implemented by sub-classes if streaming download is supported. Will return an asynchronous generator. :param pipeline: The pipeline object :type pipeline: azure.core.pipeline """
[docs] def parts(self) -> AsyncIterator: """Assuming the content-type is multipart/mixed, will return the parts as an async iterator. :rtype: AsyncIterator :raises ValueError: If the content is not multipart/mixed """ if not self.content_type or not self.content_type.startswith("multipart/mixed"): raise ValueError( "You can't get parts if the response is not multipart/mixed" ) return _PartGenerator(self)
class AsyncHttpClientTransportResponse(_HttpClientTransportResponse, AsyncHttpResponse): """Create a HTTPResponse from an http.client response. Body will NOT be read by the constructor. Call "body()" to load the body in memory if necessary. :param HttpRequest request: The request. :param httpclient_response: The object returned from an HTTP(S)Connection from http.client """
[docs]class AsyncHttpTransport( AbstractAsyncContextManager, abc.ABC, Generic[HTTPRequestType, AsyncHTTPResponseType], ): """An http sender ABC. """
[docs] @abc.abstractmethod async def send(self, request, **kwargs): """Send the request using this HTTP sender. """
[docs] @abc.abstractmethod async def open(self): """Assign new session if one does not already exist."""
[docs] @abc.abstractmethod async def close(self): """Close the session if it is not externally owned."""
[docs] async def sleep(self, duration): await asyncio.sleep(duration)
def __enter__(self): raise TypeError("Use async with instead") def __exit__(self, exc_type, exc_val, exc_tb): # __exit__ should exist in pair with __enter__ but never executed pass # pragma: no cover