Source code for azure.schemaregistry.serializer.avroserializer._schema_registry_avro_serializer

# --------------------------------------------------------------------------
#
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the ""Software""), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
from io import BytesIO
from typing import Any, Dict, Union
import avro

from ._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX
from ._avro_serializer import AvroObjectSerializer


[docs]class SchemaRegistryAvroSerializer(object): """ SchemaRegistryAvroSerializer provides the ability to serialize and deserialize data according to the given avro schema. It would automatically register, get and cache the schema. :param schema_registry: The schema registry client which is used to register schema and retrieve schema from the service. :type schema_registry: ~azure.schemaregistry.SchemaRegistryClient :param str schema_group: Schema group under which schema should be registered. :keyword str codec: The writer codec. If None, let the avro library decides. """ def __init__(self, schema_registry, schema_group, **kwargs): # type: ("SchemaRegistryClient", str, Any) -> None self._schema_group = schema_group self._avro_serializer = AvroObjectSerializer(codec=kwargs.get("codec")) self._schema_registry_client = schema_registry # type: "SchemaRegistryClient" self._id_to_schema = {} self._schema_to_id = {} self._user_input_schema_cache = {} def __enter__(self): # type: () -> SchemaRegistryAvroSerializer self._schema_registry_client.__enter__() return self def __exit__(self, *exc_details): # type: (Any) -> None self._schema_registry_client.__exit__(*exc_details)
[docs] def close(self): # type: () -> None """ This method is to close the sockets opened by the client. It need not be used when using with a context manager. """ self._schema_registry_client.close()
def _get_schema_id(self, schema_name, schema, **kwargs): # type: (str, avro.schema.Schema, Any) -> str """ Get schema id from local cache with the given schema. If there is no item in the local cache, get schema id from the service and cache it. :param schema_name: Name of the schema :type schema_name: str :param schema: Schema object :type schema: avro.schema.Schema :return: Schema Id :rtype: str """ schema_str = str(schema) try: return self._schema_to_id[schema_str] except KeyError: schema_id = self._schema_registry_client.register_schema( self._schema_group, schema_name, "Avro", schema_str, **kwargs ).schema_id self._schema_to_id[schema_str] = schema_id self._id_to_schema[schema_id] = schema_str return schema_id def _get_schema(self, schema_id, **kwargs): # type: (str, Any) -> str """ Get schema content from local cache with the given schema id. If there is no item in the local cache, get schema from the service and cache it. :param str schema_id: Schema id :return: Schema content """ try: return self._id_to_schema[schema_id] except KeyError: schema_str = self._schema_registry_client.get_schema(schema_id, **kwargs).schema_content self._id_to_schema[schema_id] = schema_str self._schema_to_id[schema_str] = schema_id return schema_str
[docs] def serialize(self, data, schema, **kwargs): # type: (Dict[str, Any], Union[str, bytes], Any) -> bytes """ Encode dict data with the given schema. The returns bytes are consisted of: The first 4 bytes denoting record format identifier. The following 32 bytes denoting schema id returned by schema registry service. The remaining bytes are the real data payload. :param data: The dict data to be encoded. :param schema: The schema used to encode the data. :type schema: Union[str, bytes] :return: """ raw_input_schema = schema try: cached_schema = self._user_input_schema_cache[raw_input_schema] except KeyError: parsed_schema = avro.schema.parse(raw_input_schema) self._user_input_schema_cache[raw_input_schema] = parsed_schema cached_schema = parsed_schema record_format_identifier = b'\0\0\0\0' schema_id = self._get_schema_id(cached_schema.fullname, cached_schema, **kwargs) data_bytes = self._avro_serializer.serialize(data, cached_schema) stream = BytesIO() stream.write(record_format_identifier) stream.write(schema_id.encode('utf-8')) stream.write(data_bytes) stream.flush() payload = stream.getvalue() stream.close() return payload
[docs] def deserialize(self, data, **kwargs): # type: (bytes, Any) -> Dict[str, Any] """ Decode bytes data. :param bytes data: The bytes data needs to be decoded. :rtype: Dict[str, Any] """ # record_format_identifier = data[0:4] # The first 4 bytes are retained for future record format identifier. schema_id = data[SCHEMA_ID_START_INDEX:(SCHEMA_ID_START_INDEX + SCHEMA_ID_LENGTH)].decode('utf-8') schema_content = self._get_schema(schema_id, **kwargs) dict_data = self._avro_serializer.deserialize(data[DATA_START_INDEX:], schema_content) return dict_data