pulsar.schema.schema
1# 2# Licensed to the Apache Software Foundation (ASF) under one 3# or more contributor license agreements. See the NOTICE file 4# distributed with this work for additional information 5# regarding copyright ownership. The ASF licenses this file 6# to you under the Apache License, Version 2.0 (the 7# "License"); you may not use this file except in compliance 8# with the License. You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, 13# software distributed under the License is distributed on an 14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15# KIND, either express or implied. See the License for the 16# specific language governing permissions and limitations 17# under the License. 18# 19 20 21from abc import abstractmethod 22import json 23import _pulsar 24import enum 25 26 27class Schema(object): 28 def __init__(self, record_cls, schema_type, schema_definition, schema_name): 29 self._record_cls = record_cls 30 self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name, 31 json.dumps(schema_definition, indent=True)) 32 33 @abstractmethod 34 def encode(self, obj): 35 pass 36 37 @abstractmethod 38 def decode(self, data): 39 pass 40 41 def schema_info(self): 42 return self._schema_info 43 44 def _validate_object_type(self, obj): 45 if not isinstance(obj, self._record_cls): 46 raise TypeError('Invalid record obj of type ' + str(type(obj)) 47 + ' - expected type is ' + str(self._record_cls)) 48 49 50class BytesSchema(Schema): 51 def __init__(self): 52 super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES') 53 54 def encode(self, data): 55 self._validate_object_type(data) 56 return data 57 58 def decode(self, data): 59 return data 60 61 def __str__(self): 62 return 'BytesSchema' 63 64 65class StringSchema(Schema): 66 def __init__(self): 67 super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING') 68 69 def encode(self, obj): 70 self._validate_object_type(obj) 71 return obj.encode('utf-8') 72 73 def decode(self, data): 74 return data.decode('utf-8') 75 76 def __str__(self): 77 return 'StringSchema' 78 79 80 81class JsonSchema(Schema): 82 83 def __init__(self, record_cls): 84 super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON, 85 record_cls.schema(), 'JSON') 86 87 def _get_serialized_value(self, o): 88 if isinstance(o, enum.Enum): 89 return o.value 90 else: 91 return o.__dict__ 92 93 def encode(self, obj): 94 self._validate_object_type(obj) 95 del obj.__dict__['_default'] 96 del obj.__dict__['_required'] 97 del obj.__dict__['_required_default'] 98 99 return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8') 100 101 def decode(self, data): 102 return self._record_cls(**json.loads(data))
class
Schema:
28class Schema(object): 29 def __init__(self, record_cls, schema_type, schema_definition, schema_name): 30 self._record_cls = record_cls 31 self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name, 32 json.dumps(schema_definition, indent=True)) 33 34 @abstractmethod 35 def encode(self, obj): 36 pass 37 38 @abstractmethod 39 def decode(self, data): 40 pass 41 42 def schema_info(self): 43 return self._schema_info 44 45 def _validate_object_type(self, obj): 46 if not isinstance(obj, self._record_cls): 47 raise TypeError('Invalid record obj of type ' + str(type(obj)) 48 + ' - expected type is ' + str(self._record_cls))
51class BytesSchema(Schema): 52 def __init__(self): 53 super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES') 54 55 def encode(self, data): 56 self._validate_object_type(data) 57 return data 58 59 def decode(self, data): 60 return data 61 62 def __str__(self): 63 return 'BytesSchema'
Inherited Members
66class StringSchema(Schema): 67 def __init__(self): 68 super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING') 69 70 def encode(self, obj): 71 self._validate_object_type(obj) 72 return obj.encode('utf-8') 73 74 def decode(self, data): 75 return data.decode('utf-8') 76 77 def __str__(self): 78 return 'StringSchema'
Inherited Members
82class JsonSchema(Schema): 83 84 def __init__(self, record_cls): 85 super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON, 86 record_cls.schema(), 'JSON') 87 88 def _get_serialized_value(self, o): 89 if isinstance(o, enum.Enum): 90 return o.value 91 else: 92 return o.__dict__ 93 94 def encode(self, obj): 95 self._validate_object_type(obj) 96 del obj.__dict__['_default'] 97 del obj.__dict__['_required'] 98 del obj.__dict__['_required_default'] 99 100 return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8') 101 102 def decode(self, data): 103 return self._record_cls(**json.loads(data))