# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
__all__ = ["build_authentication_token", "WebPubSubServiceClient"]
from copy import deepcopy
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
import jwt
import six
import azure.core.credentials as corecredentials
import azure.core.pipeline as corepipeline
import azure.core.pipeline.policies as corepolicies
import azure.core.pipeline.transport as coretransport
# Temporary location for types that eventually graduate to Azure Core
from .core import rest as corerest
from ._version import VERSION as _VERSION
from ._policies import JwtCredentialPolicy
from ._utils import UTC as _UTC
if TYPE_CHECKING:
from azure.core.pipeline.policies import HTTPPolicy, SansIOHTTPPolicy
from typing import Any, List, cast, Type, TypeVar
ClientType = TypeVar("ClientType", bound="WebPubSubServiceClient")
def _parse_connection_string(connection_string, **kwargs):
for segment in connection_string.split(";"):
if "=" in segment:
key, value = segment.split("=", maxsplit=1)
key = key.lower()
if key not in ("version", ):
kwargs.setdefault(key, value)
elif segment:
raise ValueError(
"Malformed connection string - expected 'key=value', found segment '{}' in '{}'".format(
segment, connection_string
)
)
if "endpoint" not in kwargs:
raise ValueError("connection_string missing 'endpoint' field")
if "accesskey" not in kwargs:
raise ValueError("connection_string missing 'accesskey' field")
return kwargs
[docs]def build_authentication_token(endpoint, hub, **kwargs):
"""Build an authentication token for the given endpoint, hub using the provided key.
:keyword endpoint: connetion string or HTTP or HTTPS endpoint for the WebPubSub service instance.
:type endpoint: ~str
:keyword hub: The hub to give access to.
:type hub: ~str
:keyword accesskey: Key to sign the token with. Required if endpoint is not a connection string
:type accesskey: ~str
:keyword ttl: Optional ttl timedelta for the token. Default is 1 hour.
:type ttl: ~datetime.timedelta
:keyword user: Optional user name (subject) for the token. Default is no user.
:type user: ~str
:keyword roles: Roles for the token.
:type roles: typing.List[str]. Default is no roles.
:returns: ~dict containing the web socket endpoint, the token and a url with the generated access token.
:rtype: ~dict
Example:
>>> build_authentication_token(endpoint='https://contoso.com/api/webpubsub', hub='theHub', key='123')
{
'baseUrl': 'wss://contoso.com/api/webpubsub/client/hubs/theHub',
'token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ...',
'url': 'wss://contoso.com/api/webpubsub/client/hubs/theHub?access_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ...'
}
"""
if 'accesskey' not in kwargs:
kwargs = _parse_connection_string(endpoint, **kwargs)
endpoint = kwargs.pop('endpoint')
user = kwargs.pop("user", None)
key = kwargs.pop("accesskey")
ttl = kwargs.pop("ttl", timedelta(hours=1))
roles = kwargs.pop("roles", [])
endpoint = endpoint.lower()
if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
raise ValueError(
"Invalid endpoint: '{}' has unknown scheme - expected 'http://' or 'https://'".format(
endpoint
)
)
# Ensure endpoint has no trailing slash
endpoint = endpoint.rstrip("/")
# Switch from http(s) to ws(s) scheme
client_endpoint = "ws" + endpoint[4:]
client_url = "{}/client/hubs/{}".format(client_endpoint, hub)
audience = "{}/client/hubs/{}".format(endpoint, hub)
payload = {
"aud": audience,
"iat": datetime.now(tz=_UTC),
"exp": datetime.now(tz=_UTC) + ttl,
}
if user:
payload["sub"] = user
if roles:
payload["role"] = roles
token = six.ensure_str(jwt.encode(payload, key, algorithm="HS256"))
return {
"baseUrl": client_url,
"token": token,
"url": "{}?access_token={}".format(client_url, token),
}
[docs]class WebPubSubServiceClient(object):
def __init__(self, endpoint, credential, **kwargs):
# type: (str, corecredentials.AzureKeyCredential, Any) -> None
"""Create a new WebPubSubServiceClient instance
:param endpoint: Endpoint to connect to.
:type endpoint: ~str
:param credential: Credentials to use to connect to endpoint.
:type credential: ~azure.core.credentials.AzureKeyCredential
:keyword api_version: Api version to use when communicating with the service.
:type api_version: str
:keyword user: User to connect as. Optional.
:type user: ~str
"""
self.endpoint = endpoint.rstrip("/")
transport = kwargs.pop("transport", None) or coretransport.RequestsTransport(
**kwargs
)
kwargs.setdefault(
"sdk_moniker", "messaging-webpubsubservice/{}".format(_VERSION)
)
policies = [
corepolicies.HeadersPolicy(**kwargs),
corepolicies.UserAgentPolicy(**kwargs),
corepolicies.RetryPolicy(**kwargs),
corepolicies.ProxyPolicy(**kwargs),
corepolicies.CustomHookPolicy(**kwargs),
corepolicies.RedirectPolicy(**kwargs),
JwtCredentialPolicy(credential, kwargs.get("user", None)),
corepolicies.NetworkTraceLoggingPolicy(**kwargs),
] # type: Any
self._pipeline = corepipeline.Pipeline(
transport,
policies,
) # type: corepipeline.Pipeline
[docs] @classmethod
def from_connection_string(cls, connection_string, **kwargs):
# type: (Type[ClientType], str, Any) -> ClientType
"""Create a new WebPubSubServiceClient from a connection string.
:param connection_string: Connection string
:type connection_string: ~str
:rtype: WebPubSubServiceClient
"""
kwargs = _parse_connection_string(connection_string, **kwargs)
kwargs["credential"] = corecredentials.AzureKeyCredential(
kwargs.pop("accesskey")
)
return cls(**kwargs)
def __repr__(self):
return "<WebPubSubServiceClient> endpoint:'{}'".format(self.endpoint)
def _format_url(self, url):
# type: (str) -> str
assert self.endpoint[-1] != "/", "My endpoint should not have a trailing slash"
return "/".join([self.endpoint, url.lstrip("/")])
[docs] def send_request(self, http_request, **kwargs):
# type: (corerest.HttpRequest, Any) -> corerest.HttpResponse
"""Runs the network request through the client's chained policies.
We have helper methods to create requests specific to this service in `azure.messaging.webpubsub.rest`.
Use these helper methods to create the request you pass to this method. See our example below:
>>> from azure.messaging.webpubsub.rest import build_healthapi_get_health_status_request
>>> request = build_healthapi_get_health_status_request(api_version)
<HttpRequest [HEAD], url: '/api/health'>
>>> response = client.send_request(request)
<HttpResponse: 200 OK>
For more information on this code flow, see https://aka.ms/azsdk/python/llcwiki
For advanced cases, you can also create your own :class:`~azure.messaging.webpubsub.core.rest.HttpRequest`
and pass it in.
:param http_request: The network request you want to make. Required.
:type http_request: ~azure.messaging.webpubsub.core.rest.HttpRequest
:keyword bool stream_response: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.messaging.webpubsub.core.rest.HttpResponse
"""
request_copy = deepcopy(http_request)
request_copy.url = self._format_url(request_copy.url)
# can't do StreamCOntextManager yet. This client doesn't have a pipeline client,
# StreamContextManager requires a pipeline client. WIll look more into it
# if kwargs.pop("stream_response", False):
# return corerest._StreamContextManager(
# client=self._client,
# request=request_copy,
# )
pipeline_response = self._pipeline.run(request_copy._internal_request, **kwargs) # pylint: disable=protected-access
response = corerest.HttpResponse(
status_code=pipeline_response.http_response.status_code,
request=request_copy,
_internal_response=pipeline_response.http_response,
)
response.read()
return response