pulsar.schema.schema_avro
1# 2# Licensed to the Apache Software Foundation (ASF) under one 3# or more contributor license agreements. See the NOTICE file 4# distributed with this work for additional information 5# regarding copyright ownership. The ASF licenses this file 6# to you under the Apache License, Version 2.0 (the 7# "License"); you may not use this file except in compliance 8# with the License. You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, 13# software distributed under the License is distributed on an 14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15# KIND, either express or implied. See the License for the 16# specific language governing permissions and limitations 17# under the License. 18# 19 20import _pulsar 21import io 22import enum 23 24from . import Record 25from .schema import Schema 26 27try: 28 import fastavro 29 HAS_AVRO = True 30except ImportError: 31 HAS_AVRO = False 32 33if HAS_AVRO: 34 class AvroSchema(Schema): 35 def __init__(self, record_cls, schema_definition=None): 36 if record_cls is None and schema_definition is None: 37 raise AssertionError("The param record_cls and schema_definition shouldn't be both None.") 38 39 if record_cls is not None: 40 self._schema = record_cls.schema() 41 else: 42 self._schema = schema_definition 43 super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO') 44 45 def _get_serialized_value(self, x): 46 if isinstance(x, enum.Enum): 47 return x.name 48 elif isinstance(x, Record): 49 return self.encode_dict(x.__dict__) 50 elif isinstance(x, list): 51 arr = [] 52 for item in x: 53 arr.append(self._get_serialized_value(item)) 54 return arr 55 elif isinstance(x, dict): 56 return self.encode_dict(x) 57 else: 58 return x 59 60 def encode(self, obj): 61 buffer = io.BytesIO() 62 m = obj 63 if self._record_cls is not None: 64 self._validate_object_type(obj) 65 m = self.encode_dict(obj.__dict__) 66 elif not isinstance(obj, dict): 67 raise ValueError('If using the custom schema, the record data should be dict type.') 68 69 fastavro.schemaless_writer(buffer, self._schema, m) 70 return buffer.getvalue() 71 72 def encode_dict(self, d): 73 obj = {} 74 for k, v in d.items(): 75 obj[k] = self._get_serialized_value(v) 76 return obj 77 78 def decode(self, data): 79 buffer = io.BytesIO(data) 80 d = fastavro.schemaless_reader(buffer, self._schema) 81 if self._record_cls is not None: 82 return self._record_cls(**d) 83 else: 84 return d 85 86else: 87 class AvroSchema(Schema): 88 def __init__(self, _record_cls, _schema_definition): 89 raise Exception("Avro library support was not found. Make sure to install Pulsar client " + 90 "with Avro support: pip3 install 'pulsar-client[avro]'") 91 92 def encode(self, obj): 93 pass 94 95 def decode(self, data): 96 pass
35 class AvroSchema(Schema): 36 def __init__(self, record_cls, schema_definition=None): 37 if record_cls is None and schema_definition is None: 38 raise AssertionError("The param record_cls and schema_definition shouldn't be both None.") 39 40 if record_cls is not None: 41 self._schema = record_cls.schema() 42 else: 43 self._schema = schema_definition 44 super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO') 45 46 def _get_serialized_value(self, x): 47 if isinstance(x, enum.Enum): 48 return x.name 49 elif isinstance(x, Record): 50 return self.encode_dict(x.__dict__) 51 elif isinstance(x, list): 52 arr = [] 53 for item in x: 54 arr.append(self._get_serialized_value(item)) 55 return arr 56 elif isinstance(x, dict): 57 return self.encode_dict(x) 58 else: 59 return x 60 61 def encode(self, obj): 62 buffer = io.BytesIO() 63 m = obj 64 if self._record_cls is not None: 65 self._validate_object_type(obj) 66 m = self.encode_dict(obj.__dict__) 67 elif not isinstance(obj, dict): 68 raise ValueError('If using the custom schema, the record data should be dict type.') 69 70 fastavro.schemaless_writer(buffer, self._schema, m) 71 return buffer.getvalue() 72 73 def encode_dict(self, d): 74 obj = {} 75 for k, v in d.items(): 76 obj[k] = self._get_serialized_value(v) 77 return obj 78 79 def decode(self, data): 80 buffer = io.BytesIO(data) 81 d = fastavro.schemaless_reader(buffer, self._schema) 82 if self._record_cls is not None: 83 return self._record_cls(**d) 84 else: 85 return d
AvroSchema(record_cls, schema_definition=None)
36 def __init__(self, record_cls, schema_definition=None): 37 if record_cls is None and schema_definition is None: 38 raise AssertionError("The param record_cls and schema_definition shouldn't be both None.") 39 40 if record_cls is not None: 41 self._schema = record_cls.schema() 42 else: 43 self._schema = schema_definition 44 super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')
def
encode(self, obj):
61 def encode(self, obj): 62 buffer = io.BytesIO() 63 m = obj 64 if self._record_cls is not None: 65 self._validate_object_type(obj) 66 m = self.encode_dict(obj.__dict__) 67 elif not isinstance(obj, dict): 68 raise ValueError('If using the custom schema, the record data should be dict type.') 69 70 fastavro.schemaless_writer(buffer, self._schema, m) 71 return buffer.getvalue()