Source code for azure.schemaregistry.serializer.avroserializer._schema_registry_avro_serializer

# --------------------------------------------------------------------------
#
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the ""Software""), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
try:
    from functools import lru_cache
except ImportError:
    from backports.functools_lru_cache import lru_cache
from io import BytesIO
from typing import Any, Dict, Mapping
import avro

from ._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX
from ._avro_serializer import AvroObjectSerializer


[docs]class AvroSerializer(object): """ AvroSerializer provides the ability to serialize and deserialize data according to the given avro schema. It would automatically register, get and cache the schema. :keyword client: Required. The schema registry client which is used to register schema and retrieve schema from the service. :paramtype client: ~azure.schemaregistry.SchemaRegistryClient :keyword str group_name: Required. Schema group under which schema should be registered. :keyword bool auto_register_schemas: When true, register new schemas passed to serialize. Otherwise, and by default, fail if it has not been pre-registered in the registry. """ def __init__(self, **kwargs): # type: (Any) -> None try: self._schema_group = kwargs.pop("group_name") self._schema_registry_client = kwargs.pop("client") # type: "SchemaRegistryClient" except KeyError as e: raise TypeError("'{}' is a required keyword.".format(e.args[0])) self._avro_serializer = AvroObjectSerializer(codec=kwargs.get("codec")) self._auto_register_schemas = kwargs.get("auto_register_schemas", False) self._auto_register_schema_func = ( self._schema_registry_client.register_schema if self._auto_register_schemas else self._schema_registry_client.get_schema_properties ) def __enter__(self): # type: () -> SchemaRegistryAvroSerializer self._schema_registry_client.__enter__() return self def __exit__(self, *exc_details): # type: (Any) -> None self._schema_registry_client.__exit__(*exc_details)
[docs] def close(self): # type: () -> None """This method is to close the sockets opened by the client. It need not be used when using with a context manager. """ self._schema_registry_client.close()
@lru_cache(maxsize=128) def _get_schema_id(self, schema_name, schema_str, **kwargs): # type: (str, str, Any) -> str """ Get schema id from local cache with the given schema. If there is no item in the local cache, get schema id from the service and cache it. :param schema_name: Name of the schema :type schema_name: str :param schema: Schema object :type schema: avro.schema.Schema :return: Schema Id :rtype: str """ schema_id = self._auto_register_schema_func( self._schema_group, schema_name, schema_str, "Avro", **kwargs ).id return schema_id @lru_cache(maxsize=128) def _get_schema(self, schema_id, **kwargs): # type: (str, Any) -> str """ Get schema content from local cache with the given schema id. If there is no item in the local cache, get schema from the service and cache it. :param str schema_id: Schema id :return: Schema content """ schema_str = self._schema_registry_client.get_schema( schema_id, **kwargs ).schema_definition return schema_str @classmethod @lru_cache(maxsize=128) def _parse_schema(cls, schema): return avro.schema.parse(schema)
[docs] def serialize(self, value, **kwargs): # type: (Mapping[str, Any], Any) -> bytes """ Encode data with the given schema. The returns bytes are consisted of: The first 4 bytes denoting record format identifier. The following 32 bytes denoting schema id returned by schema registry service. The remaining bytes are the real data payload. :param value: The data to be encoded. :type value: Mapping[str, Any] :keyword schema: Required. The schema used to encode the data. :paramtype schema: str :rtype: bytes """ try: raw_input_schema = kwargs.pop("schema") except KeyError as e: raise TypeError("'{}' is a required keyword.".format(e.args[0])) cached_schema = AvroSerializer._parse_schema(raw_input_schema) record_format_identifier = b"\0\0\0\0" schema_id = self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs) data_bytes = self._avro_serializer.serialize(value, cached_schema) stream = BytesIO() stream.write(record_format_identifier) stream.write(schema_id.encode("utf-8")) stream.write(data_bytes) stream.flush() payload = stream.getvalue() stream.close() return payload
[docs] def deserialize(self, value, **kwargs): # type: (bytes, Any) -> Dict[str, Any] """ Decode bytes data. :param bytes value: The bytes data needs to be decoded. :rtype: Dict[str, Any] """ # record_format_identifier = data[0:4] # The first 4 bytes are retained for future record format identifier. schema_id = value[ SCHEMA_ID_START_INDEX : (SCHEMA_ID_START_INDEX + SCHEMA_ID_LENGTH) ].decode("utf-8") schema_definition = self._get_schema(schema_id, **kwargs) dict_value = self._avro_serializer.deserialize( value[DATA_START_INDEX:], schema_definition ) return dict_value