# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
__all__ = ["WebPubSubServiceClient"]
from typing import TYPE_CHECKING
from copy import deepcopy
import azure.core.pipeline as corepipeline
import azure.core.pipeline.policies as corepolicies
import azure.core.pipeline.transport as coretransport
# Temporary location for types that eventually graduate to Azure Core
from .core import rest as corerest
from ._policies import JwtCredentialPolicy
if TYPE_CHECKING:
import azure.core.credentials as corecredentials
from azure.core.pipeline.policies import HTTPPolicy, SansIOHTTPPolicy
from typing import Any, List, cast # pylint: disable=ungrouped-imports
[docs]class WebPubSubServiceClient(object):
def __init__(self, endpoint, credential, **kwargs):
# type: (str, corecredentials.AzureKeyCredential, Any) -> None
"""Create a new WebPubSubServiceClient instance
:param endpoint: Endpoint to connect to.
:type endpoint: ~str
:param credential: Credentials to use to connect to endpoint.
:type credential: ~azure.core.credentials.AzureKeyCredential
:keyword api_version: Api version to use when communicating with the service.
:type api_version: str
:keyword user: User to connect as. Optional.
:type user: ~str
"""
self.endpoint = endpoint.rstrip("/")
transport = kwargs.pop("transport", None) or coretransport.RequestsTransport(
**kwargs
)
policies = [
corepolicies.HeadersPolicy(**kwargs),
corepolicies.UserAgentPolicy(**kwargs),
corepolicies.AsyncRetryPolicy(**kwargs),
corepolicies.ProxyPolicy(**kwargs),
corepolicies.CustomHookPolicy(**kwargs),
corepolicies.AsyncRedirectPolicy(**kwargs),
JwtCredentialPolicy(credential, kwargs.get("user", None)),
corepolicies.NetworkTraceLoggingPolicy(**kwargs),
] # type: Any
self._pipeline = corepipeline.AsyncPipeline(
transport,
policies,
) # type: corepipeline.AsyncPipeline
def _format_url(self, url):
# type: (str) -> str
assert self.endpoint[-1] != "/", "My endpoint should not have a trailing slash"
return "/".join([self.endpoint, url.lstrip("/")])
[docs] async def send_request(
self, http_request: corerest.HttpRequest, **kwargs: "Any"
) -> corerest.AsyncHttpResponse:
"""Runs the network request through the client's chained policies.
We have helper methods to create requests specific to this service in `azure.messaging.webpubsub.rest`.
Use these helper methods to create the request you pass to this method. See our example below:
>>> from azure.messaging.webpubsub.rest import build_healthapi_get_health_status_request
>>> request = build_healthapi_get_health_status_request(api_version)
<HttpRequest [HEAD], url: '/api/health'>
>>> response = await client.send_request(request)
<AsyncHttpResponse: 200 OK>
For more information on this code flow, see https://aka.ms/azsdk/python/llcwiki
For advanced cases, you can also create your own :class:`~azure.messaging.webpubsub.core.rest.HttpRequest`
and pass it in.
:param http_request: The network request you want to make. Required.
:type http_request: ~azure.messaging.webpubsub.core.rest.HttpRequest
:keyword bool stream_response: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.messaging.webpubsub.core.rest.AsyncHttpResponse
"""
request_copy = deepcopy(http_request)
request_copy.url = self._format_url(request_copy.url)
# can't do AsyncStreamContextManager yet. This client doesn't have a pipeline client,
# AsyncStreamContextManager requires a pipeline client. WIll look more into it
# if kwargs.pop("stream_response", False):
# return corerest._AsyncStreamContextManager(
# client=self._client,
# request=request_copy,
# )
pipeline_response = await self._pipeline.run(
request_copy._internal_request, **kwargs # pylint: disable=protected-access
)
response = corerest.AsyncHttpResponse(
status_code=pipeline_response.http_response.status_code,
request=request_copy,
_internal_response=pipeline_response.http_response,
)
await response.read()
return response