pulsar.schema.schema
1# 2# Licensed to the Apache Software Foundation (ASF) under one 3# or more contributor license agreements. See the NOTICE file 4# distributed with this work for additional information 5# regarding copyright ownership. The ASF licenses this file 6# to you under the Apache License, Version 2.0 (the 7# "License"); you may not use this file except in compliance 8# with the License. You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, 13# software distributed under the License is distributed on an 14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15# KIND, either express or implied. See the License for the 16# specific language governing permissions and limitations 17# under the License. 18# 19 20 21from abc import abstractmethod 22import json 23import _pulsar 24import enum 25 26 27class Schema(object): 28 def __init__(self, record_cls, schema_type, schema_definition, schema_name): 29 self._record_cls = record_cls 30 self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name, 31 json.dumps(schema_definition, indent=True)) 32 33 @abstractmethod 34 def encode(self, obj): 35 pass 36 37 @abstractmethod 38 def decode(self, data): 39 pass 40 41 def schema_info(self): 42 return self._schema_info 43 44 def _validate_object_type(self, obj): 45 if not isinstance(obj, self._record_cls): 46 raise TypeError('Invalid record obj of type ' + str(type(obj)) 47 + ' - expected type is ' + str(self._record_cls)) 48 49 50class BytesSchema(Schema): 51 def __init__(self): 52 super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES') 53 54 def encode(self, data): 55 self._validate_object_type(data) 56 return data 57 58 def decode(self, data): 59 return data 60 61 def __str__(self): 62 return 'BytesSchema' 63 64 65class StringSchema(Schema): 66 def __init__(self): 67 super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING') 68 69 def encode(self, obj): 70 self._validate_object_type(obj) 71 return obj.encode('utf-8') 72 73 def decode(self, data): 74 return data.decode('utf-8') 75 76 def __str__(self): 77 return 'StringSchema' 78 79 80def remove_reserved_key(data): 81 if '_default' in data: 82 del data['_default'] 83 if '_required' in data: 84 del data['_required'] 85 if '_required_default' in data: 86 del data['_required_default'] 87 88 89class JsonSchema(Schema): 90 91 def __init__(self, record_cls): 92 super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON, 93 record_cls.schema(), 'JSON') 94 95 def _get_serialized_value(self, o): 96 if isinstance(o, enum.Enum): 97 return o.value 98 else: 99 data = o.__dict__.copy() 100 remove_reserved_key(data) 101 return data 102 103 def encode(self, obj): 104 self._validate_object_type(obj) 105 # Copy the dict of the object as to not modify the provided object via the reference provided 106 data = obj.__dict__.copy() 107 remove_reserved_key(data) 108 return json.dumps(data, default=self._get_serialized_value, indent=True).encode('utf-8') 109 110 def decode(self, data): 111 return self._record_cls(**json.loads(data))
class
Schema:
28class Schema(object): 29 def __init__(self, record_cls, schema_type, schema_definition, schema_name): 30 self._record_cls = record_cls 31 self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name, 32 json.dumps(schema_definition, indent=True)) 33 34 @abstractmethod 35 def encode(self, obj): 36 pass 37 38 @abstractmethod 39 def decode(self, data): 40 pass 41 42 def schema_info(self): 43 return self._schema_info 44 45 def _validate_object_type(self, obj): 46 if not isinstance(obj, self._record_cls): 47 raise TypeError('Invalid record obj of type ' + str(type(obj)) 48 + ' - expected type is ' + str(self._record_cls))
51class BytesSchema(Schema): 52 def __init__(self): 53 super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES') 54 55 def encode(self, data): 56 self._validate_object_type(data) 57 return data 58 59 def decode(self, data): 60 return data 61 62 def __str__(self): 63 return 'BytesSchema'
Inherited Members
66class StringSchema(Schema): 67 def __init__(self): 68 super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING') 69 70 def encode(self, obj): 71 self._validate_object_type(obj) 72 return obj.encode('utf-8') 73 74 def decode(self, data): 75 return data.decode('utf-8') 76 77 def __str__(self): 78 return 'StringSchema'
Inherited Members
def
remove_reserved_key(data):
90class JsonSchema(Schema): 91 92 def __init__(self, record_cls): 93 super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON, 94 record_cls.schema(), 'JSON') 95 96 def _get_serialized_value(self, o): 97 if isinstance(o, enum.Enum): 98 return o.value 99 else: 100 data = o.__dict__.copy() 101 remove_reserved_key(data) 102 return data 103 104 def encode(self, obj): 105 self._validate_object_type(obj) 106 # Copy the dict of the object as to not modify the provided object via the reference provided 107 data = obj.__dict__.copy() 108 remove_reserved_key(data) 109 return json.dumps(data, default=self._get_serialized_value, indent=True).encode('utf-8') 110 111 def decode(self, data): 112 return self._record_cls(**json.loads(data))
def
encode(self, obj):
104 def encode(self, obj): 105 self._validate_object_type(obj) 106 # Copy the dict of the object as to not modify the provided object via the reference provided 107 data = obj.__dict__.copy() 108 remove_reserved_key(data) 109 return json.dumps(data, default=self._get_serialized_value, indent=True).encode('utf-8')