Azure Cosmos DB SQL API client library for Python

Azure Cosmos DB is a globally distributed, multi-model database service that supports document, key-value, wide-column, and graph databases.

Use the Azure Cosmos DB SQL API SDK for Python to manage databases and the JSON documents they contain in this NoSQL database service. High level capabilities are:

  • Create Cosmos DB databases and modify their settings

  • Create and modify containers to store collections of JSON documents

  • Create, read, update, and delete the items (JSON documents) in your containers

  • Query the documents in your database using SQL-like syntax

SDK source code | Package (PyPI) | API reference documentation | Product documentation | Samples

This SDK is used for the SQL API. For all other APIs, please check the Azure Cosmos DB documentation to evaluate the best SDK for your project.

Getting started

Important update on Python 2.x Support

New releases of this SDK won’t support Python 2.x starting January 1st, 2022. Please check the CHANGELOG for more information.

Prerequisites

If you need a Cosmos DB SQL API account, you can create one with this Azure CLI command:

az cosmosdb create --resource-group <resource-group-name> --name <cosmos-account-name>

Install the package

pip install azure-cosmos

Configure a virtual environment (optional)

Although not required, you can keep your base system and Azure SDK environments isolated from one another if you use a virtual environment. Execute the following commands to configure and then enter a virtual environment with venv:

python3 -m venv azure-cosmosdb-sdk-environment
source azure-cosmosdb-sdk-environment/bin/activate

Authenticate the client

Interaction with Cosmos DB starts with an instance of the CosmosClient class. You need an account, its URI, and one of its account keys to instantiate the client object.

Use the Azure CLI snippet below to populate two environment variables with the database account URI and its primary master key (you can also find these values in the Azure portal). The snippet is formatted for the Bash shell.

RES_GROUP=<resource-group-name>
ACCT_NAME=<cosmos-db-account-name>

export ACCOUNT_URI=$(az cosmosdb show --resource-group $RES_GROUP --name $ACCT_NAME --query documentEndpoint --output tsv)
export ACCOUNT_KEY=$(az cosmosdb list-keys --resource-group $RES_GROUP --name $ACCT_NAME --query primaryMasterKey --output tsv)

Create the client

Once you’ve populated the ACCOUNT_URI and ACCOUNT_KEY environment variables, you can create the CosmosClient.

from azure.cosmos import CosmosClient

import os
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)

Key concepts

Once you’ve initialized a CosmosClient, you can interact with the primary resource types in Cosmos DB:

  • Database: A Cosmos DB account can contain multiple databases. When you create a database, you specify the API you’d like to use when interacting with its documents: SQL, MongoDB, Gremlin, Cassandra, or Azure Table. Use the DatabaseProxy object to manage its containers.

  • Container: A container is a collection of JSON documents. You create (insert), read, update, and delete items in a container by using methods on the ContainerProxy object.

  • Item: An Item is the dictionary-like representation of a JSON document stored in a container. Each Item you add to a container must include an id key with a value that uniquely identifies the item within the container.

For more information about these resources, see Working with Azure Cosmos databases, containers and items.

How to use enable_cross_partition_query

The keyword-argument enable_cross_partition_query accepts 2 options: None (default) or True.

Note on using queries by id

When using queries that try to find items based on an id value, always make sure you are passing in a string type variable. Azure Cosmos DB only allows string id values and if you use any other datatype, this SDK will return no results and no error messages.

Limitations

Currently the features below are not supported. For alternatives options, check the Workarounds section below.

Data Plane Limitations:

  • Group By queries

  • Queries with COUNT from a DISTINCT subquery: SELECT COUNT (1) FROM (SELECT DISTINCT C.ID FROM C)

  • Bulk/Transactional batch processing

  • Direct TCP Mode access

  • Continuation token for cross partitions queries

  • Change Feed: Processor

  • Change Feed: Read multiple partitions key values

  • Change Feed: Read specific time

  • Change Feed: Read from the beggining

  • Change Feed: Pull model

  • Cross-partition ORDER BY for mixed types

  • Integrated Cache using the default consistency level, that is “Session”. To take advantage of the new Cosmos DB Integrated Cache, it is required to explicitly set CosmosClient consistency level to “Eventual”: consistency_level= Eventual.

Control Plane Limitations:

  • Get CollectionSizeUsage, DatabaseUsage, and DocumentUsage metrics

  • Create Geospatial Index

  • Provision Autoscale DBs or containers

  • Update Autoscale throughput

  • Update analytical store ttl (time to live)

  • Get the connection string

  • Get the minimum RU/s of a container

Security Limitations:

  • AAD support

Workarounds

Bulk processing Limitation Workaround

If you want to use Python SDK to perform bulk inserts to Cosmos DB, the best alternative is to use stored procedures to write multiple items with the same partition key.

Control Plane Limitations Workaround

Typically you can use Azure Portal, Azure Cosmos DB Resource Provider REST API, Azure CLI or PowerShell for the control plane unsupported limitations.

AAD Support Workaround

A possible workaround is to use managed identities to programmatically get the keys.

Consistency Level

Please be aware that this SDK has “Session” as the default consistency level, and it overrides your Cosmos DB database account default option. Click here to learn more about Cosmos DB consistency levels.

Boolean Data Type

While the Python language uses “True” and “False” for boolean types, Cosmos DB accepts “true” and “false” only. In other words, the Python language uses Boolean values with the first uppercase letter and all other lowercase letters, while Cosmos DB and its SQL language use only lowercase letters for those same Boolean values. How to deal with this challenge?

  • Your JSON documents created with Python must use “True” and “False”, to pass the language validation. The SDK will convert it to “true” and “false” for you. Meaning that “true” and “false” is what will be stored in Cosmos DB.

  • If you retrieve those documents with the Cosmos DB Portal’s Data Explorer, you will see “true” and “false”.

  • If you retrieve those documents with this Python SDK, “true” and “false” values will be automatically converted to “True” and “False”.

SQL Queries x FROM Clause Subitems

This SDK uses the query_items method to submit SQL queries to Azure Cosmos DB.

Cosmos DB SQL language allows you to get subitems by using the FROM clause, to reduce the source to a smaller subset. As an example, you can use select * from Families.children instead of select * from Families. But please note that:

  • For SQL queries using the query_items method, this SDK demands that you specify the partition_key or use the enable_cross_partition_query flag.

  • If you are getting subitems and specifying the partition_key, please make sure that your partition key is included in the subitems, which is not true for most of the cases.

Max Item Count

This is a parameter of the query_items method, an integer indicating the maximum number of items to be returned per page. The None value can be specified to let the service determine the optimal item count. This is the recommended configuration value, and the default behavior of this SDK when it is not set.

Examples

The following sections provide several code snippets covering some of the most common Cosmos DB tasks, including:

Create a database

After authenticating your CosmosClient, you can work with any resource in the account. The code snippet below creates a SQL API database, which is the default when no API is specified when create_database is invoked.

from azure.cosmos import CosmosClient, exceptions
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
try:
    database = client.create_database(DATABASE_NAME)
except exceptions.CosmosResourceExistsError:
    database = client.get_database_client(DATABASE_NAME)

Create a container

This example creates a container with default settings. If a container with the same name already exists in the database (generating a 409 Conflict error), the existing container is obtained instead.

from azure.cosmos import CosmosClient, PartitionKey, exceptions
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'

try:
    container = database.create_container(id=CONTAINER_NAME, partition_key=PartitionKey(path="/productName"))
except exceptions.CosmosResourceExistsError:
    container = database.get_container_client(CONTAINER_NAME)
except exceptions.CosmosHttpResponseError:
    raise

Create an analytical store enabled container

This example creates a container with Analytical Store enabled, for reporting, BI, AI, and Advanced Analytics with Azure Synapse Link.

The options for analytical_storage_ttl are:

  • 0 or Null or not informed: Not enabled.

  • -1: The data will be stored infinitely.

  • Any other number: the actual ttl, in seconds.

CONTAINER_NAME = 'products'
try:
    container = database.create_container(id=CONTAINER_NAME, partition_key=PartitionKey(path="/productName"),analytical_storage_ttl=-1)
except exceptions.CosmosResourceExistsError:
    container = database.get_container_client(CONTAINER_NAME)
except exceptions.CosmosHttpResponseError:
    raise

The preceding snippets also handle the CosmosHttpResponseError exception if the container creation failed. For more information on error handling and troubleshooting, see the Troubleshooting section.

Get an existing container

Retrieve an existing container from the database:

from azure.cosmos import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

Insert data

To insert items into a container, pass a dictionary containing your data to ContainerProxy.upsert_item. Each item you add to a container must include an id key with a value that uniquely identifies the item within the container.

This example inserts several items into the container, each with a unique id:

from azure.cosmos import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

for i in range(1, 10):
    container.upsert_item({
            'id': 'item{0}'.format(i),
            'productName': 'Widget',
            'productModel': 'Model {0}'.format(i)
        }
    )

Delete data

To delete items from a container, use ContainerProxy.delete_item. The SQL API in Cosmos DB does not support the SQL DELETE statement.

from azure.cosmos import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

for item in container.query_items(
        query='SELECT * FROM products p WHERE p.productModel = "Model 2"',
        enable_cross_partition_query=True):
    container.delete_item(item, partition_key='Widget')

NOTE: If you are using partitioned collection, the value of the partitionKey in the example code above, should be set to the value of the partition key for this particular item, not the name of the partition key column in your collection. This holds true for both point reads and deletes.

Query the database

A Cosmos DB SQL API database supports querying the items in a container with ContainerProxy.query_items using SQL-like syntax.

This example queries a container for items with a specific id:

from azure.cosmos import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

# Enumerate the returned items
import json
for item in container.query_items(
        query='SELECT * FROM mycontainer r WHERE r.id="item3"',
        enable_cross_partition_query=True):
    print(json.dumps(item, indent=True))

NOTE: Although you can specify any value for the container name in the FROM clause, we recommend you use the container name for consistency.

Perform parameterized queries by passing a dictionary containing the parameters and their values to ContainerProxy.query_items:

discontinued_items = container.query_items(
    query='SELECT * FROM products p WHERE p.productModel = @model',
    parameters=[
        dict(name='@model', value='Model 7')
    ],
    enable_cross_partition_query=True
)
for item in discontinued_items:
    print(json.dumps(item, indent=True))

For more information on querying Cosmos DB databases using the SQL API, see Query Azure Cosmos DB data with SQL queries.

Get database properties

Get and display the properties of a database:

from azure.cosmos import CosmosClient
import os
import json

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
properties = database.read()
print(json.dumps(properties))

Get database and container throughputs

Get and display the throughput values of a database and of a container with dedicated throughput:

from azure.cosmos import CosmosClient
import os
import json

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)

# Database
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
db_offer = database.read_offer()
print('Found Offer \'{0}\' for Database \'{1}\' and its throughput is \'{2}\''.format(db_offer.properties['id'], database.id, db_offer.properties['content']['offerThroughput']))

# Container with dedicated throughput only. Will return error "offer not found" for containers without dedicated throughput
CONTAINER_NAME = 'testContainer'
container = database.get_container_client(CONTAINER_NAME)
container_offer = container.read_offer()
print('Found Offer \'{0}\' for Container \'{1}\' and its throughput is \'{2}\''.format(container_offer.properties['id'], container.id, container_offer.properties['content']['offerThroughput']))

Modify container properties

Certain properties of an existing container can be modified. This example sets the default time to live (TTL) for items in the container to 10 seconds:

from azure.cosmos import CosmosClient, PartitionKey
import os
import json

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

database.replace_container(
    container,
    partition_key=PartitionKey(path="/productName"),
    default_ttl=10,
)
# Display the new TTL setting for the container
container_props = container.read()
print(json.dumps(container_props['defaultTtl']))

For more information on TTL, see Time to Live for Azure Cosmos DB data.

Using the asynchronous client

The asynchronous cosmos client is a separate client that looks and works in a similar fashion to the existing synchronous client. However, the async client needs to be imported separately and its methods need to be used with the async/await keywords.

from azure.cosmos.aio import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

async def create_items():
    for i in range(10):
        await container.upsert_item({
                'id': 'item{0}'.format(i),
                'productName': 'Widget',
                'productModel': 'Model {0}'.format(i)
            }
        )
    await client.close() # the async client must be closed manually if it's not initialized in a with statement

It is also worth pointing out that the asynchronous client has to be closed manually after its use, either by initializing it using async with or calling the close() method directly like shown above.

from azure.cosmos.aio import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
DATABASE_NAME = 'testDatabase'
CONTAINER_NAME = 'products'

async with CosmosClient(URL, credential=KEY) as client: # the with statement will automatically close the async client
    database = client.get_database_client(DATABASE_NAME)
    container = database.get_container_client(CONTAINER_NAME)
    for i in range(10):
        await container.upsert_item({
                'id': 'item{0}'.format(i),
                'productName': 'Widget',
                'productModel': 'Model {0}'.format(i)
            }
        )

Queries with the asynchronous client

Unlike the synchronous client, the async client does not have an enable_cross_partition flag in the request. Queries without a specified partition key value will attempt to do a cross partition query by default.

Query results can be iterated, but the query’s raw output returns an asynchronous iterator. This means that each object from the iterator is an awaitable object, and does not yet contain the true query result. In order to obtain the query results you can use an async for loop, which awaits each result as you iterate on the object, or manually await each query result as you iterate over the asynchronous iterator.

Since the query results are an asynchronous iterator, they can’t be cast into lists directly; instead, if you need to create lists from your results, use an async for loop or Python’s list comprehension to populate a list:

from azure.cosmos.aio import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)

async def create_lists():
    results = container.query_items(
            query='SELECT * FROM products p WHERE p.productModel = "Model 2"')

    # iterates on "results" iterator to asynchronously create a complete list of the actual query results

    item_list = []
    async for item in results:
        item_list.append(item)

    # Asynchronously creates a complete list of the actual query results. This code performs the same action as the for-loop example above.
    item_list = [item async for item in results]
    await client.close()

Troubleshooting

General

When you interact with Cosmos DB using the Python SDK, exceptions returned by the service correspond to the same HTTP status codes returned for REST API requests:

HTTP Status Codes for Azure Cosmos DB

For example, if you try to create a container using an ID (name) that’s already in use in your Cosmos DB database, a 409 error is returned, indicating the conflict. In the following snippet, the error is handled gracefully by catching the exception and displaying additional information about the error.

try:
    database.create_container(id=CONTAINER_NAME, partition_key=PartitionKey(path="/productName"))
except exceptions.CosmosResourceExistsError:
    print("""Error creating container
HTTP status code 409: The ID (name) provided for the container is already in use.
The container name must be unique within the database.""")

Logging

This library uses the standard logging library for logging. Basic information about HTTP sessions (URLs, headers, etc.) is logged at INFO level.

Detailed DEBUG level logging, including request/response bodies and unredacted headers, can be enabled on a client with the logging_enable argument:

import sys
import logging
from azure.cosmos import CosmosClient

# Create a logger for the 'azure' SDK
logger = logging.getLogger('azure')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

# This client will log detailed information about its HTTP sessions, at DEBUG level
client = CosmosClient(URL, credential=KEY, logging_enable=True)

Similarly, logging_enable can enable detailed logging for a single operation, even when it isn’t enabled for the client:

database = client.create_database(DATABASE_NAME, logging_enable=True)

Next steps

For more extensive documentation on the Cosmos DB service, see the Azure Cosmos DB documentation on docs.microsoft.com.

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.