# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Authorization helper functions in the Azure Cosmos database service.
"""
import base64
from hashlib import sha256
import hmac
import six
from . import http_constants
def __GetAuthorizationTokenUsingMasterKey(verb, resource_id_or_fullname, resource_type, headers, master_key):
"""Gets the authorization token using `master_key.
:param str verb:
:param str resource_id_or_fullname:
:param str resource_type:
:param dict headers:
:param str master_key:
:return: The authorization token.
:rtype: dict
"""
# decodes the master key which is encoded in base64
key = base64.b64decode(master_key)
# Skipping lower casing of resource_id_or_fullname since it may now contain "ID"
# of the resource as part of the fullname
text = "{verb}\n{resource_type}\n{resource_id_or_fullname}\n{x_date}\n{http_date}\n".format(
verb=(verb.lower() or ""),
resource_type=(resource_type.lower() or ""),
resource_id_or_fullname=(resource_id_or_fullname or ""),
x_date=headers.get(http_constants.HttpHeaders.XDate, "").lower(),
http_date=headers.get(http_constants.HttpHeaders.HttpDate, "").lower(),
)
if six.PY2:
body = text.decode("utf-8")
digest = hmac.new(key, body, sha256).digest()
signature = digest.encode("base64")
else:
# python 3 support
body = text.encode("utf-8")
digest = hmac.new(key, body, sha256).digest()
signature = base64.encodebytes(digest).decode("utf-8")
master_token = "master"
token_version = "1.0"
return "type={type}&ver={ver}&sig={sig}".format(type=master_token, ver=token_version, sig=signature[:-1])
def __GetAuthorizationTokenUsingResourceTokens(resource_tokens, path, resource_id_or_fullname):
"""Get the authorization token using `resource_tokens`.
:param dict resource_tokens:
:param str path:
:param str resource_id_or_fullname:
:return: The authorization token.
:rtype: dict
"""
if resource_tokens:
# For database account access(through GetDatabaseAccount API), path and
# resource_id_or_fullname are '', so in this case we return the first token to be
# used for creating the auth header as the service will accept any token in this case
if not path and not resource_id_or_fullname:
return next(six.itervalues(resource_tokens))
if resource_tokens.get(resource_id_or_fullname):
return resource_tokens[resource_id_or_fullname]
path_parts = []
if path:
path_parts = path.split("/")
resource_types = [
"dbs",
"colls",
"docs",
"sprocs",
"udfs",
"triggers",
"users",
"permissions",
"attachments",
"conflicts",
"offers",
]
# Get the last resource id or resource name from the path and get it's token from resource_tokens
for one_part in reversed(path_parts):
if not one_part in resource_types and one_part in resource_tokens:
return resource_tokens[one_part]
return None