azure.schemaregistry.aio package

class azure.schemaregistry.aio.SchemaRegistryClient(fully_qualified_namespace: str, credential: AsyncTokenCredential, **kwargs: Any)[source]

SchemaRegistryClient is a client for registering and retrieving schemas from the Azure Schema Registry service.

Parameters
  • fully_qualified_namespace (str) – The Schema Registry service fully qualified host name. For example: my-namespace.servicebus.windows.net.

  • credential (AsyncTokenCredential) – To authenticate managing the entities of the SchemaRegistry namespace.

Keyword Arguments

api_version (str) – The Schema Registry service API version to use for requests. Default value and only accepted value currently is “2021-10”.

Example:

Create a new instance of the SchemaRegistryClient.
SCHEMAREGISTRY_FQN = os.environ["SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE"]
token_credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(
    fully_qualified_namespace=SCHEMAREGISTRY_FQN, credential=token_credential
)
async close()None[source]

This method is to close the sockets opened by the client. It need not be used when using with a context manager.

async get_schema(schema_id: str, **kwargs: Any)Schema[source]
async get_schema(*, group_name: str, name: str, version: int, **kwargs)Schema

Gets a registered schema. There are two ways to call this method:

1) To get a registered schema by its unique ID, pass the schema_id parameter and any optional keyword arguments. Azure Schema Registry guarantees that ID is unique within a namespace.

2) To get a specific version of a schema within the specified schema group, pass in the required keyword arguments group_name, name, and version and any optional keyword arguments.

Parameters

schema_id (str) – References specific schema in registry namespace.

Keyword Arguments
  • group_name (str) – Name of schema group that contains the registered schema.

  • name (str) – Name of schema which should be retrieved.

  • version (int) – Version of schema which should be retrieved.

Return type

Schema

Raises

HttpResponseError

Example:

Get schema by id.
schema = await schema_registry_client.get_schema(schema_id)
definition = schema.definition
properties = schema.properties
Get schema by version.
group_name = os.environ["SCHEMAREGISTRY_GROUP"]
name = "your-schema-name"
schema = await schema_registry_client.get_schema(group_name=group_name, name=name, version=version)
definition = schema.definition
properties = schema.properties
async get_schema_properties(group_name: str, name: str, definition: str, format: Union[str, azure.schemaregistry._common._constants.SchemaFormat], **kwargs: Any)azure.schemaregistry._common._schema.SchemaProperties[source]

Gets the schema properties corresponding to an existing schema within the specified schema group, as matched by schema defintion comparison.

Parameters
  • group_name (str) – Schema group under which schema should be registered.

  • name (str) – Name of schema for which properties should be retrieved.

  • definition (str) – String representation of the schema for which properties should be retrieved.

  • format (Union[str, SchemaFormat]) – Format for the schema for which properties should be retrieved.

Return type

SchemaProperties

Raises

HttpResponseError

Example:

Get schema by id.
group_name = os.environ["SCHEMAREGISTRY_GROUP"]
name = "your-schema-name"
format = "Avro"
schema_json = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number", "type": ["int", "null"]},
        {"name": "favorite_color", "type": ["string", "null"]},
    ],
}
definition = json.dumps(schema_json, separators=(",", ":"))
schema_properties = await schema_registry_client.get_schema_properties(
    group_name, name, definition, format
)
schema_id = schema_properties.id
async register_schema(group_name: str, name: str, definition: str, format: Union[str, azure.schemaregistry._common._constants.SchemaFormat], **kwargs: Any)azure.schemaregistry._common._schema.SchemaProperties[source]

Register new schema. If schema of specified name does not exist in specified group, schema is created at version 1. If schema of specified name exists already in specified group, schema is created at latest version + 1.

Parameters
  • group_name (str) – Schema group under which schema should be registered.

  • name (str) – Name of schema being registered.

  • definition (str) – String representation of the schema being registered.

  • format (Union[str, SchemaFormat]) – Format for the schema being registered. For now Avro is the only supported schema format by the service.

Return type

SchemaProperties

Raises

HttpResponseError

Example:

Register a new schema.
GROUP_NAME = os.environ["SCHEMAREGISTRY_GROUP"]
NAME = "your-schema-name"
FORMAT = "Avro"
SCHEMA_JSON = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number", "type": ["int", "null"]},
        {"name": "favorite_color", "type": ["string", "null"]},
    ],
}
DEFINITION = json.dumps(SCHEMA_JSON, separators=(",", ":"))
schema_properties = await schema_registry_client.register_schema(
    GROUP_NAME, NAME, DEFINITION, FORMAT
)
schema_id = schema_properties.id