Source code for azure.messaging.webpubsubservice.aio

# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
__all__ = ["WebPubSubServiceClient"]

from typing import TYPE_CHECKING
from copy import deepcopy

import azure.core.pipeline as corepipeline
import azure.core.pipeline.policies as corepolicies
import azure.core.pipeline.transport as coretransport

# Temporary location for types that eventually graduate to Azure Core
from .core import rest as corerest

from ._policies import JwtCredentialPolicy

if TYPE_CHECKING:
    import azure.core.credentials as corecredentials
    from azure.core.pipeline.policies import HTTPPolicy, SansIOHTTPPolicy
    from typing import Any, List, cast # pylint: disable=ungrouped-imports


[docs]class WebPubSubServiceClient(object): def __init__(self, endpoint, credential, **kwargs): # type: (str, corecredentials.AzureKeyCredential, Any) -> None """Create a new WebPubSubServiceClient instance :param endpoint: Endpoint to connect to. :type endpoint: ~str :param credential: Credentials to use to connect to endpoint. :type credential: ~azure.core.credentials.AzureKeyCredential :keyword api_version: Api version to use when communicating with the service. :type api_version: str :keyword user: User to connect as. Optional. :type user: ~str """ self.endpoint = endpoint.rstrip("/") transport = kwargs.pop("transport", None) or coretransport.RequestsTransport( **kwargs ) policies = [ corepolicies.HeadersPolicy(**kwargs), corepolicies.UserAgentPolicy(**kwargs), corepolicies.AsyncRetryPolicy(**kwargs), corepolicies.ProxyPolicy(**kwargs), corepolicies.CustomHookPolicy(**kwargs), corepolicies.AsyncRedirectPolicy(**kwargs), JwtCredentialPolicy(credential, kwargs.get("user", None)), corepolicies.NetworkTraceLoggingPolicy(**kwargs), ] # type: Any self._pipeline = corepipeline.AsyncPipeline( transport, policies, ) # type: corepipeline.AsyncPipeline def _format_url(self, url): # type: (str) -> str assert self.endpoint[-1] != "/", "My endpoint should not have a trailing slash" return "/".join([self.endpoint, url.lstrip("/")])
[docs] async def send_request( self, http_request: corerest.HttpRequest, **kwargs: "Any" ) -> corerest.AsyncHttpResponse: """Runs the network request through the client's chained policies. We have helper methods to create requests specific to this service in `azure.messaging.webpubsub.rest`. Use these helper methods to create the request you pass to this method. See our example below: >>> from azure.messaging.webpubsub.rest import build_healthapi_get_health_status_request >>> request = build_healthapi_get_health_status_request(api_version) <HttpRequest [HEAD], url: '/api/health'> >>> response = await client.send_request(request) <AsyncHttpResponse: 200 OK> For more information on this code flow, see https://aka.ms/azsdk/python/llcwiki For advanced cases, you can also create your own :class:`~azure.messaging.webpubsub.core.rest.HttpRequest` and pass it in. :param http_request: The network request you want to make. Required. :type http_request: ~azure.messaging.webpubsub.core.rest.HttpRequest :keyword bool stream_response: Whether the response payload will be streamed. Defaults to False. :return: The response of your network call. Does not do error handling on your response. :rtype: ~azure.messaging.webpubsub.core.rest.AsyncHttpResponse """ request_copy = deepcopy(http_request) request_copy.url = self._format_url(request_copy.url) # can't do AsyncStreamContextManager yet. This client doesn't have a pipeline client, # AsyncStreamContextManager requires a pipeline client. WIll look more into it # if kwargs.pop("stream_response", False): # return corerest._AsyncStreamContextManager( # client=self._client, # request=request_copy, # ) pipeline_response = await self._pipeline.run( request_copy._internal_request, **kwargs # pylint: disable=protected-access ) response = corerest.AsyncHttpResponse( status_code=pipeline_response.http_response.status_code, request=request_copy, _internal_response=pipeline_response.http_response, ) await response.read() return response